From c510eb363a03b2662e6db8541dbac3d197bf25e2 Mon Sep 17 00:00:00 2001 From: Dillon DuPont Date: Thu, 15 May 2025 14:57:06 -0400 Subject: [PATCH] added diorama to server side --- .../computer_server/diorama/__init__.py | 3 + .../computer_server/diorama/base.py | 4 + .../computer_server/diorama/diorama.py | 355 +++++ .../diorama/diorama_computer.py | 26 + .../computer_server/diorama/draw.py | 1211 +++++++++++++++++ .../computer_server/diorama/macos.py | 29 + .../computer_server/diorama/safezone.py | 199 +++ .../computer_server/handlers/factory.py | 16 +- libs/computer-server/computer_server/main.py | 3 +- libs/computer/computer/computer.py | 13 + libs/computer/computer/diorama_computer.py | 89 ++ libs/computer/computer/interface/macos.py | 4 + 12 files changed, 1944 insertions(+), 8 deletions(-) create mode 100644 libs/computer-server/computer_server/diorama/__init__.py create mode 100644 libs/computer-server/computer_server/diorama/base.py create mode 100644 libs/computer-server/computer_server/diorama/diorama.py create mode 100644 libs/computer-server/computer_server/diorama/diorama_computer.py create mode 100644 libs/computer-server/computer_server/diorama/draw.py create mode 100644 libs/computer-server/computer_server/diorama/macos.py create mode 100644 libs/computer-server/computer_server/diorama/safezone.py create mode 100644 libs/computer/computer/diorama_computer.py diff --git a/libs/computer-server/computer_server/diorama/__init__.py b/libs/computer-server/computer_server/diorama/__init__.py new file mode 100644 index 00000000..3608c381 --- /dev/null +++ b/libs/computer-server/computer_server/diorama/__init__.py @@ -0,0 +1,3 @@ +from .diorama import Diorama + +__all__ = ["Diorama"] diff --git a/libs/computer-server/computer_server/diorama/base.py b/libs/computer-server/computer_server/diorama/base.py new file mode 100644 index 00000000..3ca01133 --- /dev/null +++ b/libs/computer-server/computer_server/diorama/base.py @@ -0,0 +1,4 @@ +class BaseDioramaHandler: + """Base Diorama handler for unsupported OSes.""" + async def diorama_cmd(self, action: str, arguments: dict = None) -> dict: + return {"success": False, "error": "Diorama is not supported on this OS yet."} diff --git a/libs/computer-server/computer_server/diorama/diorama.py b/libs/computer-server/computer_server/diorama/diorama.py new file mode 100644 index 00000000..b0564d4a --- /dev/null +++ b/libs/computer-server/computer_server/diorama/diorama.py @@ -0,0 +1,355 @@ +#!/usr/bin/env python3 +"""Diorama: A virtual desktop manager for macOS""" + +import os +import asyncio +import logging +import sys +import io +from typing import Union +from PIL import Image, ImageDraw + +from draw import capture_all_apps, AppActivationContext, get_frontmost_and_active_app, get_all_windows, get_running_apps + +from diorama_computer import DioramaComputer +from computer_server.handlers.macos import * + +# simple, nicely formatted logging +logging.basicConfig( + level=logging.INFO, + format='[%(asctime)s] [%(levelname)s] %(message)s', + datefmt='%H:%M:%S', + stream=sys.stdout +) +logger = logging.getLogger("diorama.virtual_desktop") + +automation_handler = MacOSAutomationHandler() + +class Diorama: + _scheduler_queue = None + _scheduler_task = None + _loop = None + _scheduler_started = False + + @classmethod + def create_from_apps(cls, *args) -> DioramaComputer: + cls._ensure_scheduler() + return cls(args).computer + + def __init__(self, app_list): + self.app_list = app_list + self.interface = self.Interface(self) + self.computer = DioramaComputer(self) + self.focus_context = None + + @classmethod + def _ensure_scheduler(cls): + if not cls._scheduler_started: + logger.info("Starting Diorama scheduler loop…") + cls._scheduler_queue = asyncio.Queue() + cls._loop = asyncio.get_event_loop() + cls._scheduler_task = cls._loop.create_task(cls._scheduler_loop()) + cls._scheduler_started = True + + @classmethod + async def _scheduler_loop(cls): + while True: + cmd = await cls._scheduler_queue.get() + action = cmd.get("action") + args = cmd.get("arguments", {}) + future = cmd.get("future") + logger.info(f"Processing command: {action} | args={args}") + + app_whitelist = args.get("app_list", []) + + all_windows = get_all_windows() + running_apps = get_running_apps() + frontmost_app, active_app_to_use, active_app_pid = get_frontmost_and_active_app(all_windows, running_apps, app_whitelist) + focus_context = AppActivationContext(active_app_pid, active_app_to_use, logger) + + with focus_context: + try: + if action == "screenshot": + app_whitelist = list(args["app_list"]) + logger.info(f"Taking screenshot for apps: {app_whitelist}") + result, img = capture_all_apps( + app_whitelist=app_whitelist, + save_to_disk=False, + take_focus=False + ) + logger.info("Screenshot complete.") + if future: + future.set_result((result, img)) + # Mouse actions + elif action in ["left_click", "right_click", "double_click", "move_cursor", "drag_to"]: + x = args.get("x") + y = args.get("y") + duration = args.get("duration", 0.5) + if action == "left_click": + await automation_handler.left_click(x, y) + elif action == "right_click": + await automation_handler.right_click(x, y) + elif action == "double_click": + await automation_handler.double_click(x, y) + elif action == "move_cursor": + await automation_handler.move_cursor(x, y) + elif action == "drag_to": + await automation_handler.drag_to(x, y, duration=duration) + if future: + future.set_result(None) + # Keyboard actions + elif action == "type_text": + text = args.get("text") + await automation_handler.type_text(text) + if future: + future.set_result(None) + elif action == "press_key": + key = args.get("key") + await automation_handler.press_key(key) + if future: + future.set_result(None) + elif action == "hotkey": + keys = args.get("keys", []) + await automation_handler.hotkey(keys) + if future: + future.set_result(None) + elif action == "get_cursor_position": + pos = await automation_handler.get_cursor_position() + if future: + future.set_result(pos) + else: + logger.warning(f"Unknown action: {action}") + if future: + future.set_exception(ValueError(f"Unknown action: {action}")) + except Exception as e: + logger.error(f"Exception during {action}: {e}", exc_info=True) + if future: + future.set_exception(e) + + class Interface(): + def __init__(self, diorama): + self._diorama = diorama + + self._scene_hitboxes = [] + self._scene_size = None + + async def _send_cmd(self, action, arguments=None): + Diorama._ensure_scheduler() + loop = asyncio.get_event_loop() + future = loop.create_future() + logger.info(f"Enqueuing {action} command for apps: {self._diorama.app_list}") + await Diorama._scheduler_queue.put({ + "action": action, + "arguments": {"app_list": self._diorama.app_list, **(arguments or {})}, + "future": future + }) + try: + return await future + except asyncio.CancelledError: + logger.warning(f"Command was cancelled: {action}") + return None + + async def screenshot(self, as_bytes: bool = True) -> Union[bytes, Image]: + result, img = await self._send_cmd("screenshot") + self._scene_hitboxes = result.get("hitboxes", []) + self._scene_size = img.size + + if as_bytes: + # PIL Image to bytes + img_byte_arr = io.BytesIO() + img.save(img_byte_arr, format="PNG") + img_byte_arr = img_byte_arr.getvalue() + return img_byte_arr + else: + return img + + async def left_click(self, x, y): + sx, sy = await self.to_screen_coordinates(x, y) + await self._send_cmd("left_click", {"x": sx, "y": sy}) + + async def right_click(self, x, y): + sx, sy = await self.to_screen_coordinates(x, y) + await self._send_cmd("right_click", {"x": sx, "y": sy}) + + async def double_click(self, x, y): + sx, sy = await self.to_screen_coordinates(x, y) + await self._send_cmd("double_click", {"x": sx, "y": sy}) + + async def move_cursor(self, x, y): + sx, sy = await self.to_screen_coordinates(x, y) + await self._send_cmd("move_cursor", {"x": sx, "y": sy}) + + async def drag_to(self, x, y, duration=0.5): + sx, sy = await self.to_screen_coordinates(x, y) + await self._send_cmd("drag_to", {"x": sx, "y": sy, "duration": duration}) + + async def get_cursor_position(self): + return await self._send_cmd("get_cursor_position") + + async def type_text(self, text): + await self._send_cmd("type_text", {"text": text}) + + async def press_key(self, key): + await self._send_cmd("press_key", {"key": key}) + + async def hotkey(self, *keys): + await self._send_cmd("hotkey", {"keys": list(keys)}) + + async def get_screen_size(self) -> dict[str, int]: + if not self._scene_size: + await self.screenshot() + return { "width": self._scene_size[0], "height": self._scene_size[1] } + + async def to_screen_coordinates(self, x: float, y: float) -> tuple[float, float]: + """Convert screenshot coordinates to screen coordinates. + + Args: + x: X absolute coordinate in screenshot space + y: Y absolute coordinate in screenshot space + + Returns: + tuple[float, float]: (x, y) absolute coordinates in screen space + """ + if not self._scene_hitboxes: + await self.screenshot() # get hitboxes + # Try all hitboxes + for h in self._scene_hitboxes[::-1]: + rect_from = h.get("hitbox") + rect_to = h.get("target") + if not rect_from or len(rect_from) != 4: + continue + + # check if (x, y) is inside rect_from + x0, y0, x1, y1 = rect_from + if x0 <= x <= x1 and y0 <= y <= y1: + logger.info(f"Found hitbox: {h}") + # remap (x, y) to rect_to + tx0, ty0, tx1, ty1 = rect_to + + # calculate offset from x0, y0 + offset_x = x - x0 + offset_y = y - y0 + + # remap offset to rect_to + tx = tx0 + offset_x + ty = ty0 + offset_y + + return tx, ty + return x, y + + async def to_screenshot_coordinates(self, x: float, y: float) -> tuple[float, float]: + """Convert screen coordinates to screenshot coordinates. + + Args: + x: X absolute coordinate in screen space + y: Y absolute coordinate in screen space + + Returns: + tuple[float, float]: (x, y) absolute coordinates in screenshot space + """ + if not self._scene_hitboxes: + await self.screenshot() # get hitboxes + # Try all hitboxes + for h in self._scene_hitboxes[::-1]: + rect_from = h.get("target") + rect_to = h.get("hitbox") + if not rect_from or len(rect_from) != 4: + continue + + # check if (x, y) is inside rect_from + x0, y0, x1, y1 = rect_from + if x0 <= x <= x1 and y0 <= y <= y1: + # remap (x, y) to rect_to + tx0, ty0, tx1, ty1 = rect_to + + # calculate offset from x0, y0 + offset_x = x - x0 + offset_y = y - y0 + + # remap offset to rect_to + tx = tx0 + offset_x + ty = ty0 + offset_y + + return tx, ty + return x, y + +import pyautogui +import time + +async def main(): + desktop1 = Diorama.create_from_apps(["Discord", "Notes"]) + desktop2 = Diorama.create_from_apps(["Terminal"]) + + img1 = await desktop1.interface.screenshot(as_bytes=False) + img2 = await desktop2.interface.screenshot(as_bytes=False) + + img1.save("app_screenshots/desktop1.png") + img2.save("app_screenshots/desktop2.png") + # Initialize Diorama desktop + desktop3 = Diorama.create_from_apps("Safari") + screen_size = await desktop3.interface.get_screen_size() + print(screen_size) + + # Take initial screenshot + img = await desktop3.interface.screenshot(as_bytes=False) + img.save("app_screenshots/desktop3.png") + + # Prepare hitboxes and draw on the single screenshot + hitboxes = desktop3.interface._scene_hitboxes[::-1] + base_img = img.copy() + draw = ImageDraw.Draw(base_img) + for h in hitboxes: + rect = h.get("hitbox") + if not rect or len(rect) != 4: + continue + draw.rectangle(rect, outline="red", width=2) + + # Track and draw mouse position in real time (single screenshot size) + last_mouse_pos = None + print("Tracking mouse... Press Ctrl+C to stop.") + try: + while True: + mouse_x, mouse_y = pyautogui.position() + if last_mouse_pos != (mouse_x, mouse_y): + last_mouse_pos = (mouse_x, mouse_y) + # Map to screenshot coordinates + sx, sy = await desktop3.interface.to_screenshot_coordinates(mouse_x, mouse_y) + # Draw on a copy of the screenshot + frame = base_img.copy() + frame_draw = ImageDraw.Draw(frame) + frame_draw.ellipse((sx-5, sy-5, sx+5, sy+5), fill="blue", outline="blue") + # Save the frame + frame.save("app_screenshots/desktop3_mouse.png") + print(f"Mouse at screen ({mouse_x}, {mouse_y}) -> screenshot ({sx:.1f}, {sy:.1f})") + time.sleep(0.05) # Throttle updates to ~20 FPS + except KeyboardInterrupt: + print("Stopped tracking.") + + draw.text((rect[0], rect[1]), str(idx), fill="red") + + canvas.save("app_screenshots/desktop3_hitboxes.png") + + + + # move mouse in a square spiral around the screen + import math + import random + + step = 20 # pixels per move + dot_radius = 10 + width = screen_size["width"] + height = screen_size["height"] + x, y = 0, 10 + + while x < width and y < height: + await desktop3.interface.move_cursor(x, y) + img = await desktop3.interface.screenshot(as_bytes=False) + draw = ImageDraw.Draw(img) + draw.ellipse((x-dot_radius, y-dot_radius, x+dot_radius, y+dot_radius), fill="red") + img.save("current.png") + await asyncio.sleep(0.03) + x += step + y = math.sin(x / width * math.pi * 2) * 50 + 25 + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/libs/computer-server/computer_server/diorama/diorama_computer.py b/libs/computer-server/computer_server/diorama/diorama_computer.py new file mode 100644 index 00000000..4fc37b3f --- /dev/null +++ b/libs/computer-server/computer_server/diorama/diorama_computer.py @@ -0,0 +1,26 @@ +import asyncio + +class DioramaComputer: + """ + A minimal Computer-like interface for Diorama, compatible with ComputerAgent. + Implements _initialized, run(), and __aenter__ for agent compatibility. + """ + def __init__(self, diorama): + self.diorama = diorama + self.interface = self.diorama.interface + self._initialized = False + + async def __aenter__(self): + # Ensure the event loop is running (for compatibility) + try: + asyncio.get_running_loop() + except RuntimeError: + asyncio.set_event_loop(asyncio.new_event_loop()) + self._initialized = True + return self + + async def run(self): + # This is a stub for compatibility + if not self._initialized: + await self.__aenter__() + return self diff --git a/libs/computer-server/computer_server/diorama/draw.py b/libs/computer-server/computer_server/diorama/draw.py new file mode 100644 index 00000000..d4097164 --- /dev/null +++ b/libs/computer-server/computer_server/diorama/draw.py @@ -0,0 +1,1211 @@ +#!/usr/bin/env python3 +"""Diorama Renderer - A tool for rendering selective views of macOS desktops + +This script renders filtered views of the macOS desktop, preserving only selected applications +while maintaining system UI elements like menubar and dock. Each "diorama" shows a consistent +view of the system while isolating specific applications. + +The image is "smart resized" to remove any empty space around the menubar and dock. + +Key features: +- Captures shared window state, z-order and position information +- Filters windows by application based on whitelist +- Preserves system context (menubar, dock) in each view +- Preserves menu-owning / keyboard-focused window in each view +- Supports parallel views of the same desktop for multi-agent systems +""" + +import sys +import os +import time +import argparse +from typing import List, Dict, Any, Optional, Tuple +import json +from PIL import Image, ImageDraw +import io +import asyncio +import functools +import logging + +# simple, nicely formatted logging +logging.basicConfig( + level=logging.INFO, + format='[%(asctime)s] [%(levelname)s] %(message)s', + datefmt='%H:%M:%S', + stream=sys.stdout +) +logger = logging.getLogger("diorama.draw") + + +from safezone import ( + get_menubar_bounds, + get_dock_bounds, +) + +# Timing decorator for profiling +def timing_decorator(func): + @functools.wraps(func) + def wrapper(*args, **kwargs): + start_time = time.time() + result = func(*args, **kwargs) + end_time = time.time() + elapsed_time = end_time - start_time + logger.debug(f"Function {func.__name__} took {elapsed_time:.4f} seconds to run") + return result + return wrapper + +# Import Objective-C bridge libraries +try: + import Quartz + import AppKit + from ApplicationServices import ( + AXUIElementCreateSystemWide, # type: ignore + AXUIElementCreateApplication, # type: ignore + AXUIElementCopyAttributeValue, # type: ignore + AXUIElementCopyAttributeValues, # type: ignore + kAXFocusedWindowAttribute, # type: ignore + kAXWindowsAttribute, # type: ignore + kAXMainWindowAttribute, # type: ignore + kAXChildrenAttribute, # type: ignore + kAXRoleAttribute, # type: ignore + kAXTitleAttribute, # type: ignore + kAXValueAttribute, # type: ignore + kAXDescriptionAttribute, # type: ignore + kAXEnabledAttribute, # type: ignore + kAXPositionAttribute, # type: ignore + kAXSizeAttribute, # type: ignore + kAXErrorSuccess, # type: ignore + AXValueGetType, # type: ignore + kAXValueCGSizeType, # type: ignore + kAXValueCGPointType, # type: ignore + kAXValueCFRangeType, # type: ignore + AXUIElementGetTypeID, # type: ignore + AXValueGetValue, # type: ignore + kAXVisibleChildrenAttribute, # type: ignore + kAXRoleDescriptionAttribute, # type: ignore + kAXFocusedApplicationAttribute, # type: ignore + kAXFocusedUIElementAttribute, # type: ignore + kAXSelectedTextAttribute, # type: ignore + kAXSelectedTextRangeAttribute, # type: ignore + ) + from AppKit import NSWorkspace, NSApplication, NSApp, NSRunningApplication + import Foundation + from Foundation import NSObject, NSMakeRect + import objc +except ImportError: + logger.error("Error: This script requires PyObjC to be installed.") + logger.error("Please install it with: pip install pyobjc") + sys.exit(1) + +# Constants for accessibility API +kAXErrorSuccess = 0 +kAXRoleAttribute = "AXRole" +kAXTitleAttribute = "AXTitle" +kAXValueAttribute = "AXValue" +kAXWindowsAttribute = "AXWindows" +kAXFocusedAttribute = "AXFocused" +kAXPositionAttribute = "AXPosition" +kAXSizeAttribute = "AXSize" +kAXChildrenAttribute = "AXChildren" +kAXMenuBarAttribute = "AXMenuBar" +kAXMenuBarItemAttribute = "AXMenuBarItem" + +# Constants for window properties +kCGWindowLayer = "kCGWindowLayer" # Z-order information (lower values are higher in the stack) +kCGWindowAlpha = "kCGWindowAlpha" # Window opacity + +# Constants for application activation options +NSApplicationActivationOptions = { + "regular": 0, # Default activation + "bringing_all_windows_forward": 1 << 0, # NSApplicationActivateAllWindows + "ignoring_other_apps": 1 << 1 # NSApplicationActivateIgnoringOtherApps +} + + +def CFAttributeToPyObject(attrValue): + def list_helper(list_value): + list_builder = [] + for item in list_value: + list_builder.append(CFAttributeToPyObject(item)) + return list_builder + + def number_helper(number_value): + success, int_value = Foundation.CFNumberGetValue( # type: ignore + number_value, Foundation.kCFNumberIntType, None # type: ignore + ) + if success: + return int(int_value) + + success, float_value = Foundation.CFNumberGetValue( # type: ignore + number_value, Foundation.kCFNumberDoubleType, None # type: ignore + ) + if success: + return float(float_value) + return None + + def axuielement_helper(element_value): + return element_value + + cf_attr_type = Foundation.CFGetTypeID(attrValue) # type: ignore + cf_type_mapping = { + Foundation.CFStringGetTypeID(): str, # type: ignore + Foundation.CFBooleanGetTypeID(): bool, # type: ignore + Foundation.CFArrayGetTypeID(): list_helper, # type: ignore + Foundation.CFNumberGetTypeID(): number_helper, # type: ignore + AXUIElementGetTypeID(): axuielement_helper, # type: ignore + } + try: + return cf_type_mapping[cf_attr_type](attrValue) + except KeyError: + # did not get a supported CF type. Move on to AX type + pass + + ax_attr_type = AXValueGetType(attrValue) + ax_type_map = { + kAXValueCGSizeType: Foundation.NSSizeFromString, # type: ignore + kAXValueCGPointType: Foundation.NSPointFromString, # type: ignore + kAXValueCFRangeType: Foundation.NSRangeFromString, # type: ignore + } + try: + search_result = re.search("{.*}", attrValue.description()) + if search_result: + extracted_str = search_result.group() + return tuple(ax_type_map[ax_attr_type](extracted_str)) + return None + except KeyError: + return None + +def element_attribute(element, attribute): + if attribute == kAXChildrenAttribute: + err, value = AXUIElementCopyAttributeValues(element, attribute, 0, 999, None) + if err == kAXErrorSuccess: + if isinstance(value, Foundation.NSArray): # type: ignore + return CFAttributeToPyObject(value) + else: + return value + err, value = AXUIElementCopyAttributeValue(element, attribute, None) + if err == kAXErrorSuccess: + if isinstance(value, Foundation.NSArray): # type: ignore + return CFAttributeToPyObject(value) + else: + return value + return None + +def element_value(element, type): + err, value = AXValueGetValue(element, type, None) + if err == True: + return value + return None + + +@timing_decorator +def get_running_apps() -> List[NSRunningApplication]: + """Get list of all running applications + + Returns: + List of NSRunningApplication objects + """ + return NSWorkspace.sharedWorkspace().runningApplications() + +# @timing_decorator +def get_app_info(app: NSRunningApplication) -> Dict[str, Any]: + """Get information about an application + + Args: + app: NSRunningApplication object + + Returns: + Dictionary with application information + """ + return { + "name": app.localizedName(), + "bundle_id": app.bundleIdentifier(), + "pid": app.processIdentifier(), + "active": app.isActive(), + "hidden": app.isHidden(), + "terminated": app.isTerminated(), + } + +@timing_decorator +def get_all_windows() -> List[Dict[str, Any]]: + """Get all windows from all applications with z-order information + + Returns: + List of window dictionaries with z-order information + """ + # Get all windows from Quartz + # The kCGWindowListOptionOnScreenOnly flag gets only visible windows with preserved z-order + window_list = Quartz.CGWindowListCopyWindowInfo( + Quartz.kCGWindowListOptionOnScreenOnly, + Quartz.kCGNullWindowID + ) + + # Create a dictionary of window z-order + z_order = {window['kCGWindowNumber']: z_index for z_index, window in enumerate(window_list[::-1])} + + # The kCGWindowListOptionAll flag gets all windows *without* z-order preserved + window_list_all = Quartz.CGWindowListCopyWindowInfo( + Quartz.kCGWindowListOptionAll, + Quartz.kCGNullWindowID + ) + + # Process all windows + windows = [] + for window in window_list_all: + # We track z_index which is the index in the window list (0 is the desktop / background) + + # Get window properties + window_id = window.get('kCGWindowNumber', 0) + window_name = window.get('kCGWindowName', '') + window_pid = window.get('kCGWindowOwnerPID', 0) + window_bounds = window.get('kCGWindowBounds', {}) + window_owner = window.get('kCGWindowOwnerName', '') + window_is_on_screen = window.get('kCGWindowIsOnscreen', False) + + # Get z-order information + # Note: kCGWindowLayer provides the system's layer value (lower values are higher in the stack) + layer = window.get(kCGWindowLayer, 0) + opacity = window.get(kCGWindowAlpha, 1.0) + z_index = z_order.get(window_id, -1) + + # Determine window role (desktop, dock, menubar, app) + if window_name == "Dock" and window_owner == "Dock": + role = "dock" + elif window_name == "Menubar" and window_owner == "Window Server": + role = "menubar" + elif window_owner in ["Window Server", "Dock"]: + role = "desktop" + else: + role = "app" + + # Only include windows with valid bounds + if window_bounds: + windows.append({ + "id": window_id, + "name": window_name or "Unnamed Window", + "pid": window_pid, + "owner": window_owner, + "role": role, + "is_on_screen": window_is_on_screen, + "bounds": { + "x": window_bounds.get('X', 0), + "y": window_bounds.get('Y', 0), + "width": window_bounds.get('Width', 0), + "height": window_bounds.get('Height', 0) + }, + "layer": layer, # System layer (lower values are higher in stack) + "z_index": z_index, # Our z-index (0 is the desktop) + "opacity": opacity + }) + + windows = sorted(windows, key=lambda x: x["z_index"]) + + return windows + +def get_app_windows(app_pid: int, all_windows: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + """Get all windows for a specific application + + Args: + app_pid: Process ID of the application + all_windows: List of all windows with z-order information + + Returns: + List of window dictionaries for the app + """ + # Filter windows by PID + return [window for window in all_windows if window["pid"] == app_pid] + +@timing_decorator +def draw_desktop_screenshot(app_whitelist: List[str] = None, all_windows: List[Dict[str, Any]] = None, dock_bounds: Dict[str, float] = None, dock_items: List[Dict[str, Any]] = None, menubar_bounds: Dict[str, float] = None, menubar_items: List[Dict[str, Any]] = None) -> Tuple[Optional[Image.Image], List[Dict[str, Any]]]: + """Capture a screenshot of the entire desktop using Quartz compositing, including dock as a second pass. + Args: + app_whitelist: Optional list of app names to include in the screenshot + Returns: + PIL Image of the desktop or None if capture failed + """ + import ctypes + + if dock_bounds is None: + dock_bounds = get_dock_bounds() + if dock_items is None: + dock_items = get_dock_items() + if menubar_bounds is None: + menubar_bounds = get_menubar_bounds() + if menubar_items is None: + menubar_items = get_menubar_items() + if all_windows is None: + all_windows = get_all_windows() + all_windows = all_windows[::-1] + all_windows = [window for window in all_windows if window["is_on_screen"]] + + main_screen = AppKit.NSScreen.mainScreen() + if main_screen: + frame = main_screen.frame() + screen_rect = Quartz.CGRectMake(0, 0, frame.size.width, frame.size.height) + else: + screen_rect = Quartz.CGRectNull + + # Screenshot-to-screen hitboxes + hitboxes = [] + + if app_whitelist is None: + # Single pass: desktop, menubar, app, dock + window_list = Foundation.CFArrayCreateMutable(None, len(all_windows), None) + for window in all_windows: + Foundation.CFArrayAppendValue(window_list, window["id"]) + cg_image = Quartz.CGWindowListCreateImageFromArray( + screen_rect, window_list, Quartz.kCGWindowImageDefault + ) + if cg_image is None: + return None + + # Create CGContext for compositing + width = int(frame.size.width) + height = int(frame.size.height) + color_space = Quartz.CGColorSpaceCreateWithName(Quartz.kCGColorSpaceSRGB) + cg_context = Quartz.CGBitmapContextCreate( + None, width, height, 8, 0, color_space, Quartz.kCGImageAlphaPremultipliedLast + ) + Quartz.CGContextDrawImage(cg_context, screen_rect, cg_image) + hitboxes.append({ + "hitbox": [0, 0, width, height], + "target": [0, 0, width, height] + }) + else: + # Filter out windows that are not in the whitelist + all_windows = [window for window in all_windows if window["owner"] in app_whitelist or window["role"] != "app"] + app_windows = [window for window in all_windows if window["role"] == "app"] + + dock_orientation = "side" if dock_bounds["width"] < dock_bounds["height"] else "bottom" + + menubar_length = max(item["bounds"]["x"] + item["bounds"]["width"] for item in menubar_items) + + # Calculate bounds of app windows + app_bounds = { + "x": min(window["bounds"]["x"] for window in app_windows) if app_windows else 0, + "y": min(window["bounds"]["y"] for window in app_windows) if app_windows else 0, + } + app_bounds["width"] = max(window["bounds"]["x"] + window["bounds"]["width"] for window in app_windows) - app_bounds["x"] if app_windows else 0 + app_bounds["height"] = max(window["bounds"]["y"] + window["bounds"]["height"] for window in app_windows) - app_bounds["y"] if app_windows else 0 + + # Set minimum bounds of 256x256 + app_bounds["width"] = max(app_bounds["width"], 256) + app_bounds["height"] = max(app_bounds["height"], 256) + + # Add dock bounds to app bounds + if dock_orientation == "bottom": + app_bounds["height"] += dock_bounds["height"] + 4 + elif dock_orientation == "side": + if dock_bounds["x"] > frame.size.width / 2: + app_bounds["width"] += dock_bounds["width"] + 4 + else: + app_bounds["x"] -= dock_bounds["width"] + 4 + app_bounds["width"] += dock_bounds["width"] + 4 + + # Add menubar bounds to app bounds + app_bounds["height"] += menubar_bounds["height"] + + # Make sure app bounds contains menubar bounds + app_bounds["width"] = max(app_bounds["width"], menubar_length) + + # Clamp bounds to screen + app_bounds["x"] = max(app_bounds["x"], 0) + app_bounds["y"] = max(app_bounds["y"], 0) + app_bounds["width"] = min(app_bounds["width"], frame.size.width - app_bounds["x"]) + app_bounds["height"] = min(app_bounds["height"], frame.size.height - app_bounds["y"] + menubar_bounds["height"]) + + # Create CGContext for compositing + width = int(app_bounds["width"]) + height = int(app_bounds["height"]) + color_space = Quartz.CGColorSpaceCreateWithName(Quartz.kCGColorSpaceSRGB) + cg_context = Quartz.CGBitmapContextCreate( + None, width, height, 8, 0, color_space, Quartz.kCGImageAlphaPremultipliedLast + ) + + def _draw_layer(cg_context, all_windows, source_rect, target_rect): + """Draw a layer of windows from source_rect to target_rect on the given context.""" + window_list = Foundation.CFArrayCreateMutable(None, len(all_windows), None) + for window in all_windows: + Foundation.CFArrayAppendValue(window_list, window["id"]) + cg_image = Quartz.CGWindowListCreateImageFromArray( + source_rect, window_list, Quartz.kCGWindowImageDefault + ) + if cg_image is not None: + Quartz.CGContextDrawImage(cg_context, target_rect, cg_image) + + # --- FIRST PASS: desktop, apps --- + source_position = [app_bounds["x"], app_bounds["y"]] + source_size = [app_bounds["width"], app_bounds["height"]] + target_position = [ + 0, + min( + menubar_bounds["y"] + menubar_bounds["height"], + app_bounds["y"] + ) + ] + target_size = [app_bounds["width"], app_bounds["height"]] + + if dock_orientation == "bottom": + source_size[1] += dock_bounds["height"] + target_size[1] += dock_bounds["height"] + elif dock_orientation == "side": + if dock_bounds["x"] < frame.size.width / 2: + source_position[0] -= dock_bounds["width"] + target_position[0] -= dock_bounds["width"] + source_size[0] += dock_bounds["width"] + target_size[0] += dock_bounds["width"] + + app_source_rect = Quartz.CGRectMake( + source_position[0], source_position[1], source_size[0], source_size[1] + ) + app_target_rect = Quartz.CGRectMake( + target_position[0], app_bounds["height"] - target_position[1] - target_size[1], target_size[0], target_size[1] + ) + first_pass_windows = [w for w in all_windows if w["role"] == "app" or w["role"] == "desktop"] + _draw_layer(cg_context, first_pass_windows, app_source_rect, app_target_rect) + + hitboxes.append({ + "hitbox": [0, menubar_bounds["height"], app_bounds["width"], menubar_bounds["height"] + app_bounds["height"]], + "target": [ + app_source_rect.origin.x, + app_source_rect.origin.y, + app_source_rect.origin.x + app_bounds["width"], + app_source_rect.origin.y + app_bounds["height"] + ] + }) + + # --- SECOND PASS: menubar --- + allowed_roles = {"menubar"} + menubar_windows = [w for w in all_windows if w["role"] in allowed_roles] + menubar_source_rect = Quartz.CGRectMake( + 0, 0, app_bounds["width"], menubar_bounds["height"] + ) + menubar_target_rect = Quartz.CGRectMake( + 0, app_bounds["height"] - menubar_bounds["height"], app_bounds["width"], menubar_bounds["height"] + ) + _draw_layer(cg_context, menubar_windows, menubar_source_rect, menubar_target_rect) + + hitboxes.append({ + "hitbox": [0, 0, app_bounds["width"], menubar_bounds["height"]], + "target": [0, 0, app_bounds["width"], menubar_bounds["height"]] + }) + + # --- THIRD PASS: dock, filtered --- + # Step 1: Collect dock items to draw, with their computed target rects + dock_draw_items = [] + for index, item in enumerate(dock_items): + source_position = (item["bounds"]["x"], item["bounds"]["y"]) + source_size = (item["bounds"]["width"], item["bounds"]["height"]) + + # apply whitelist to middle items + if not (index == 0 or index == len(dock_items) - 1): + if item["subrole"] == "AXApplicationDockItem": + if item["title"] not in app_whitelist: + continue + elif item["subrole"] == "AXMinimizedWindowDockItem": + if not any(window["name"] == item["title"] and window["role"] == "app" and window["owner"] in app_whitelist for window in all_windows): + continue + elif item["subrole"] == "AXFolderDockItem": + continue + + # Preserve unscaled (original) source position and size before any modification + hitbox_position = source_position + hitbox_size = source_size + + screen_position = source_position + screen_size = source_size + + # stretch to screen size + padding = 32 + if dock_orientation == "bottom": + source_position = (source_position[0], 0) + source_size = (source_size[0], frame.size.height) + + hitbox_position = (source_position[0], app_bounds['height'] - hitbox_size[1]) + hitbox_size = (source_size[0], hitbox_size[1]) + + if index == 0: + source_size = (padding + source_size[0], source_size[1]) + source_position = (source_position[0] - padding, 0) + elif index == len(dock_items) - 1: + source_size = (source_size[0] + padding, source_size[1]) + source_position = (source_position[0], 0) + + elif dock_orientation == "side": + source_position = (0, source_position[1]) + source_size = (frame.size.width, source_size[1]) + + hitbox_position = ( + source_position[0] if dock_bounds['x'] < frame.size.width / 2 else app_bounds['width'] - hitbox_size[0], + source_position[1] + ) + hitbox_size = (hitbox_size[0], source_size[1]) + + if index == 0: + source_size = (source_size[0], padding + source_size[1]) + source_position = (0, source_position[1] - padding) + elif index == len(dock_items) - 1: + source_size = (source_size[0], source_size[1] + padding) + source_position = (0, source_position[1]) + + + # Compute the initial target position + target_position = source_position + target_size = source_size + + dock_draw_items.append({ + "item": item, + "index": index, + "source_position": source_position, + "source_size": source_size, + "target_size": target_size, + "target_position": target_position, # Will be updated after packing + "hitbox_position": hitbox_position, + "hitbox_size": hitbox_size, + "screen_position": screen_position, + "screen_size": screen_size, + }) + + # Step 2: Pack the target rects along the main axis, removing gaps + packed_positions = [] + if dock_orientation == "bottom": + # Pack left-to-right + x_cursor = 0 + for draw_item in dock_draw_items: + packed_positions.append((x_cursor, draw_item["target_position"][1])) + x_cursor += draw_item["target_size"][0] + packed_strip_length = x_cursor + # Center horizontally + x_offset = (app_bounds['width'] - packed_strip_length) / 2 + y_offset = (frame.size.height - app_bounds['height']) + for i, draw_item in enumerate(dock_draw_items): + px, py = packed_positions[i] + draw_item["target_position"] = (px + x_offset, py - y_offset) + + # Pack unscaled source rects + x_cursor = 0 + for draw_item in dock_draw_items: + draw_item["hitbox_position"] = (x_cursor, draw_item["hitbox_position"][1]) + x_cursor += draw_item["hitbox_size"][0] + packed_strip_length = x_cursor + # Center horizontally + x_offset = (app_bounds['width'] - packed_strip_length) / 2 + for i, draw_item in enumerate(dock_draw_items): + px, py = draw_item["hitbox_position"] + draw_item["hitbox_position"] = (px + x_offset, py) + elif dock_orientation == "side": + # Pack top-to-bottom + y_cursor = 0 + for draw_item in dock_draw_items: + packed_positions.append((draw_item["target_position"][0], y_cursor)) + y_cursor += draw_item["target_size"][1] + packed_strip_length = y_cursor + # Center vertically + y_offset = (app_bounds['height'] - packed_strip_length) / 2 + x_offset = 0 if dock_bounds['x'] < frame.size.width / 2 else frame.size.width - app_bounds['width'] + for i, draw_item in enumerate(dock_draw_items): + px, py = packed_positions[i] + draw_item["target_position"] = (px - x_offset, py + y_offset) + + # Pack unscaled source rects + y_cursor = 0 + for draw_item in dock_draw_items: + draw_item["hitbox_position"] = (draw_item["hitbox_position"][0], y_cursor) + y_cursor += draw_item["hitbox_size"][1] + packed_strip_length = y_cursor + # Center vertically + y_offset = (app_bounds['height'] - packed_strip_length) / 2 + for i, draw_item in enumerate(dock_draw_items): + px, py = draw_item["hitbox_position"] + draw_item["hitbox_position"] = (px, py + y_offset) + + dock_windows = [window for window in all_windows if window["role"] == "dock"] + # Step 3: Draw dock items using packed and recentered positions + for draw_item in dock_draw_items: + item = draw_item["item"] + source_position = draw_item["source_position"] + source_size = draw_item["source_size"] + target_position = draw_item["target_position"] + target_size = draw_item["target_size"] + + # flip target position y + target_position = (target_position[0], app_bounds['height'] - target_position[1] - target_size[1]) + + source_rect = Quartz.CGRectMake(*source_position, *source_size) + target_rect = Quartz.CGRectMake(*target_position, *target_size) + + _draw_layer(cg_context, dock_windows, source_rect, target_rect) + + hitbox_position = draw_item["hitbox_position"] + hitbox_size = draw_item["hitbox_size"] + + # Debug: Draw true hitbox rect (packed position, unscaled size) + # # Flip y like target_rect + # hitbox_position_flipped = ( + # hitbox_position[0], + # app_bounds['height'] - hitbox_position[1] - hitbox_size[1] + # ) + # hitbox_rect = Quartz.CGRectMake(*hitbox_position_flipped, *hitbox_size) + # Quartz.CGContextSetStrokeColorWithColor(cg_context, Quartz.CGColorCreateGenericRGB(0, 1, 0, 1)) + # Quartz.CGContextStrokeRect(cg_context, hitbox_rect) + + hitboxes.append({ + "hitbox": [*hitbox_position, hitbox_position[0] + hitbox_size[0], hitbox_position[1] + hitbox_size[1]], + "target": [*draw_item["screen_position"], draw_item["screen_position"][0] + draw_item["screen_size"][0], draw_item["screen_position"][1] + draw_item["screen_size"][1]] + }) + + + # Convert composited context to CGImage + final_cg_image = Quartz.CGBitmapContextCreateImage(cg_context) + ns_image = AppKit.NSImage.alloc().initWithCGImage_size_(final_cg_image, Foundation.NSZeroSize) + ns_data = ns_image.TIFFRepresentation() + bitmap_rep = AppKit.NSBitmapImageRep.imageRepWithData_(ns_data) + png_data = bitmap_rep.representationUsingType_properties_(AppKit.NSBitmapImageFileTypePNG, None) + image_data = io.BytesIO(png_data) + return Image.open(image_data), hitboxes + +@timing_decorator +def get_menubar_items(active_app_pid: int = None) -> List[Dict[str, Any]]: + """Get menubar items from the active application using Accessibility API + + Args: + active_app_pid: PID of the active application + + Returns: + List of dictionaries with menubar item information + """ + menubar_items = [] + + if active_app_pid is None: + # Get the frontmost application's PID if none provided + frontmost_app = NSWorkspace.sharedWorkspace().frontmostApplication() + if frontmost_app: + active_app_pid = frontmost_app.processIdentifier() + else: + logger.error("Error: Could not determine frontmost application") + return menubar_items + + # Create an accessibility element for the application + app_element = AXUIElementCreateApplication(active_app_pid) + if app_element is None: + logger.error(f"Error: Could not create accessibility element for PID {active_app_pid}") + return menubar_items + + # Get the menubar + menubar = element_attribute(app_element, kAXMenuBarAttribute) + if menubar is None: + logger.error(f"Error: Could not get menubar for application with PID {active_app_pid}") + return menubar_items + + # Get the menubar items + children = element_attribute(menubar, kAXChildrenAttribute) + if children is None: + logger.error("Error: Could not get menubar items") + return menubar_items + + # Process each menubar item + for i in range(len(children)): + item = children[i] + + # Get item title + title = element_attribute(item, kAXTitleAttribute) or "Untitled" + + # Create bounding box + bounds = { + "x": 0, + "y": 0, + "width": 0, + "height": 0 + } + + # Get item position + position_value = element_attribute(item, kAXPositionAttribute) + if position_value: + position_value = element_value(position_value, kAXValueCGPointType) + bounds["x"] = position_value.x + bounds["y"] = position_value.y + + # Get item size + size_value = element_attribute(item, kAXSizeAttribute) + if size_value: + size_value = element_value(size_value, kAXValueCGSizeType) + bounds["width"] = size_value.width + bounds["height"] = size_value.height + + + # Add to list + menubar_items.append({ + "title": title, + "bounds": bounds, + "index": i, + "app_pid": active_app_pid + }) + + return menubar_items + +@timing_decorator +def get_dock_items() -> List[Dict[str, Any]]: + """Get all items in the macOS Dock + + Returns: + List of dictionaries with Dock item information + """ + dock_items = [] + + # Find the Dock process + dock_pid = None + running_apps = get_running_apps() + for app in running_apps: + if app.localizedName() == "Dock" and app.bundleIdentifier() == "com.apple.dock": + dock_pid = app.processIdentifier() + break + + if dock_pid is None: + logger.error("Error: Could not find Dock process") + return dock_items + + # Create an accessibility element for the Dock + dock_element = AXUIElementCreateApplication(dock_pid) + if dock_element is None: + logger.error(f"Error: Could not create accessibility element for Dock (PID {dock_pid})") + return dock_items + + # Get the Dock's main element + dock_list = element_attribute(dock_element, kAXChildrenAttribute) + if dock_list is None or len(dock_list) == 0: + logger.error("Error: Could not get Dock children") + return dock_items + + # Find the Dock's application list (usually the first child) + dock_app_list = None + for child in dock_list: + role = element_attribute(child, kAXRoleAttribute) + if role == "AXList": + dock_app_list = child + break + + if dock_app_list is None: + logger.error("Error: Could not find Dock application list") + return dock_items + + # Get all items in the Dock + items = element_attribute(dock_app_list, kAXChildrenAttribute) + if items is None: + logger.error("Error: Could not get Dock items") + return dock_items + + # Process each Dock item + for i, item in enumerate(items): + # Get item attributes + title = element_attribute(item, kAXTitleAttribute) or "Untitled" + description = element_attribute(item, "AXDescription") or "" + role = element_attribute(item, kAXRoleAttribute) or "" + subrole = element_attribute(item, "AXSubrole") or "" + + # Create bounding box + bounds = { + "x": 0, + "y": 0, + "width": 0, + "height": 0 + } + + # Get item position + position_value = element_attribute(item, kAXPositionAttribute) + if position_value: + position_value = element_value(position_value, kAXValueCGPointType) + bounds["x"] = position_value.x + bounds["y"] = position_value.y + + # Get item size + size_value = element_attribute(item, kAXSizeAttribute) + if size_value: + size_value = element_value(size_value, kAXValueCGSizeType) + bounds["width"] = size_value.width + bounds["height"] = size_value.height + + # Determine if this is an application, file/folder, or separator + item_type = "unknown" + if subrole == "AXApplicationDockItem": + item_type = "application" + elif subrole == "AXFolderDockItem": + item_type = "folder" + elif subrole == "AXDocumentDockItem": + item_type = "document" + elif subrole == "AXSeparatorDockItem" or role == "AXSeparator": + item_type = "separator" + elif "trash" in title.lower(): + item_type = "trash" + + # Add to list + dock_items.append({ + "title": title, + "description": description, + "bounds": bounds, + "index": i, + "type": item_type, + "role": role, + "subrole": subrole + }) + + return dock_items + +class AppActivationContext: + def __init__(self, active_app_pid=None, active_app_to_use="", logger=None): + self.active_app_pid = active_app_pid + self.active_app_to_use = active_app_to_use + self.logger = logger + self.frontmost_app = None + + def __enter__(self): + from AppKit import NSWorkspace + if self.active_app_pid: + if self.logger and self.active_app_to_use: + self.logger.debug(f"Automatically activating app '{self.active_app_to_use}' for screenshot composition") + self.frontmost_app = NSWorkspace.sharedWorkspace().frontmostApplication() + running_apps_list = NSWorkspace.sharedWorkspace().runningApplications() + for app in running_apps_list: + if app.processIdentifier() == self.active_app_pid: + app.activateWithOptions_(0) + break + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + if self.frontmost_app: + self.frontmost_app.activateWithOptions_(0) + +def get_frontmost_and_active_app(all_windows, running_apps, app_whitelist): + from AppKit import NSWorkspace + frontmost_app = NSWorkspace.sharedWorkspace().frontmostApplication() + + active_app_to_use = None + active_app_pid = None + + # Find the topmost (highest z_index) non-filtered app + for window in reversed(all_windows): + owner = window.get("owner") + role = window.get("role") + is_on_screen = window.get("is_on_screen") + + # Skip non-app windows + if role != "app": + continue + + # Skip not-on-screen windows + if not is_on_screen: + continue + + # Skip filtered apps + if app_whitelist is not None and owner not in app_whitelist: + continue + + # Found a suitable app + active_app_to_use = owner + active_app_pid = window.get("pid") + break + + # If no suitable app found, use Finder + if active_app_to_use is None: + active_app_to_use = "Finder" + for app in running_apps: + if app.localizedName() == "Finder": + active_app_pid = app.processIdentifier() + break + + return frontmost_app, active_app_to_use, active_app_pid + +def capture_all_apps(save_to_disk: bool = False, app_whitelist: List[str] = None, output_dir: str = None, take_focus: bool = True) -> Tuple[Dict[str, Any], Optional[Image.Image]]: + """Capture screenshots of all running applications + + Args: + save_to_disk: Whether to save screenshots to disk + app_whitelist: Optional list of app names to include in the recomposited screenshot + (will always include 'Window Server' and 'Dock') + + Returns: + Dictionary with application information and screenshots + Optional PIL Image of the recomposited screenshot + """ + result = { + "timestamp": time.time(), + "applications": [], + "windows": [], # New array to store all windows, including those without apps + "menubar_items": [], # New array to store menubar items + "dock_items": [] # New array to store dock items + } + + # Get all windows with z-order information + all_windows = get_all_windows() + + # Get all running applications + running_apps = get_running_apps() + + frontmost_app, active_app_to_use, active_app_pid = get_frontmost_and_active_app(all_windows, running_apps, app_whitelist) if take_focus else (None, None, None) + + # Use AppActivationContext to activate the app and restore focus + with AppActivationContext(active_app_pid, active_app_to_use, logger): + + # Process applications + for app in running_apps: + # Skip system apps without a bundle ID + if app.bundleIdentifier() is None: + continue + + app_info = get_app_info(app) + app_windows = get_app_windows(app.processIdentifier(), all_windows) + + app_data = { + "info": app_info, + "windows": [ window["id"] for window in app_windows ] + } + + result["applications"].append(app_data) + + # Add all windows to the result + result["windows"] = all_windows + + # Get menubar items from the active application + menubar_items = get_menubar_items(active_app_pid) + result["menubar_items"] = menubar_items + + # Get dock items + dock_items = get_dock_items() + result["dock_items"] = dock_items + + # Get menubar bounds + menubar_bounds = get_menubar_bounds() + result["menubar_bounds"] = menubar_bounds + + # Get dock bounds + dock_bounds = get_dock_bounds() + result["dock_bounds"] = dock_bounds + + # Capture the entire desktop using Quartz compositing + desktop_screenshot, hitboxes = draw_desktop_screenshot(app_whitelist, all_windows, dock_bounds, dock_items, menubar_bounds, menubar_items) + + result["hitboxes"] = hitboxes + + from PIL import Image, ImageDraw, ImageChops + def _draw_hitboxes(img, hitboxes, key="target"): + """ + Overlay opaque colored rectangles for each hitbox (using hitbox[key]) + with color depending on index, then multiply overlay onto img. + Args: + img: PIL.Image (RGBA or RGB) + hitboxes: list of dicts with 'hitbox' and 'target' keys + key: 'hitbox' or 'target' + Returns: + PIL.Image with overlayed hitboxes (same mode/size as input) + """ + # Ensure RGBA mode for blending + base = img.convert("RGBA") + overlay = Image.new("RGBA", base.size, (0, 0, 0, 0)) + draw = ImageDraw.Draw(overlay) + + # Distinct colors for order + colors = [ + (255, 0, 0, 180), # Red + (0, 255, 0, 180), # Green + (0, 0, 255, 180), # Blue + (255, 255, 0, 180), # Yellow + (0, 255, 255, 180), # Cyan + (255, 0, 255, 180), # Magenta + (255, 128, 0, 180), # Orange + (128, 0, 255, 180), # Purple + (0, 128, 255, 180), # Sky blue + (128, 255, 0, 180), # Lime + ] + # Set minimum brightness for colors + min_brightness = 0 + colors = [ + (max(min_brightness, c[0]), max(min_brightness, c[1]), max(min_brightness, c[2]), c[3]) for c in colors + ] + + for i, h in enumerate(hitboxes): + rect = h.get(key) + color = colors[i % len(colors)] + if rect: + draw.rectangle(rect, fill=color) + + # Multiply blend overlay onto base + result = ImageChops.multiply(base, overlay) + return result + + # DEBUG: Save hitboxes to disk + if desktop_screenshot and save_to_disk and output_dir: + desktop_path = os.path.join(output_dir, "desktop.png") + desktop_screenshot.save(desktop_path) + result["desktop_screenshot"] = desktop_path + + logger.info(f"Saved desktop screenshot to {desktop_path}") + + if app_whitelist: + # Take screenshot without whitelist + desktop_screenshot_full, hitboxes_full = draw_desktop_screenshot( + None, all_windows, dock_bounds, dock_items, menubar_bounds, menubar_items) + + # Draw hitboxes on both images using overlay + img1 = _draw_hitboxes(desktop_screenshot.copy(), hitboxes, key="hitbox") + img2 = _draw_hitboxes(desktop_screenshot_full.copy(), hitboxes, key="target") if desktop_screenshot_full else None + + if img2 and hitboxes_full: + + # Compose side-by-side + from PIL import Image + width = img1.width + img2.width + height = max(img1.height, img2.height) + combined = Image.new('RGBA', (width, height), (0, 0, 0, 0)) + combined.paste(img1, (0, 0)) + combined.paste(img2, (img1.width, 0)) + side_by_side_path = os.path.join(output_dir, "side_by_side_hitboxes.png") + combined.save(side_by_side_path) + result["side_by_side_hitboxes"] = side_by_side_path + else: + # Overlay hitboxes using new function + hitbox_img = _draw_hitboxes(desktop_screenshot.copy(), hitboxes, key="hitbox") + hitbox_path = os.path.join(output_dir, "hitboxes.png") + hitbox_img.save(hitbox_path) + result["hitbox_screenshot"] = hitbox_path + + # Focus restoration is now handled by AppActivationContext + + return result, desktop_screenshot + +async def run_capture(): + """Run the screenshot capture asynchronously""" + # Parse command line arguments + parser = argparse.ArgumentParser(description="Capture screenshots of running macOS applications") + parser.add_argument("--output", "-o", help="Output directory for screenshots", default="app_screenshots") + parser.add_argument("--filter", "-f", nargs="+", help="Filter recomposited screenshot to only include specified apps") + parser.add_argument("--menubar", "-m", action="store_true", help="List menubar and status items with their bounding boxes") + parser.add_argument("--dock", "-d", action="store_true", help="List Dock items with their bounding boxes") + parser.add_argument("--demo", nargs="*", help="Demo mode: pass app names to capture individual and combinations, create mosaic PNG") + args = parser.parse_args() + + # Create output directory in the current directory if not absolute + if not os.path.isabs(args.output): + output_dir = os.path.join(os.getcwd(), args.output) + else: + output_dir = args.output + + # DEMO MODE: capture each app and all non-empty combinations, then mosaic + if args.demo: + from PIL import Image + demo_apps = args.demo + print(f"Running in DEMO mode for apps: {demo_apps}") + groups = [] + for item in demo_apps: + if "/" in item: + group = [x.strip() for x in item.split("/") if x.strip()] + else: + group = [item.strip()] + if group: + groups.append(group) + screenshots = [] + for group in groups: + print(f"Capturing for apps: {group}") + _, img = capture_all_apps(app_whitelist=group) + if img: + screenshots.append((group, img)) + if not screenshots: + print("No screenshots captured in demo mode.") + return + # Mosaic-pack: grid (rows of sqrt(N)) + def make_mosaic(images, pad=64, bg=(30,30,30)): + import rpack + sizes = [(img.width + pad, img.height + pad) for _, img in images] + positions = rpack.pack(sizes) + # Find the bounding box for the mosaic + max_x = max(x + w for (x, y), (w, h) in zip(positions, sizes)) + max_y = max(y + h for (x, y), (w, h) in zip(positions, sizes)) + mosaic = Image.new("RGBA", (max_x, max_y), bg) + for (group, img), (x, y) in zip(images, positions): + mosaic.paste(img, (x, y)) + return mosaic + mosaic_img = make_mosaic(screenshots) + mosaic_path = os.path.join(output_dir, "demo_mosaic.png") + os.makedirs(output_dir, exist_ok=True) + mosaic_img.save(mosaic_path) + print(f"Demo mosaic saved to: {mosaic_path}") + return + + # Capture all apps and save to disk, including a recomposited screenshot + print(f"Capturing screenshots of all running applications...") + print(f"Saving screenshots to: {output_dir}") + + # If filter is provided, show what we're filtering by + if args.filter: + print(f"Filtering recomposited screenshot to only include: {', '.join(args.filter)} (plus Window Server and Dock)") + + result, img = capture_all_apps( + save_to_disk=True, + app_whitelist=args.filter, + output_dir=output_dir, + take_focus=True + ) + + # Print summary + print(f"\nCapture complete!") + print(f"Captured {len(result['applications'])} applications") + + total_app_windows = sum(len(app["windows"]) for app in result["applications"]) + print(f"Total application windows captured: {total_app_windows}") + print(f"Total standalone windows captured: {len(result['windows'])}") + + # Print details of each application + print("\nApplication details:") + for app in result["applications"]: + app_info = app["info"] + windows = app["windows"] + print(f" - {app_info['name']} ({len(windows)} windows)") + + # Print recomposited screenshot path if available + if "desktop_screenshot" in result: + print(f"\nRecomposited screenshot saved to: {result['desktop_screenshot']}") + + # Print menubar items if requested + if args.menubar and "menubar_items" in result: + print("\nMenubar items:") + + # Find app name for the PID + app_name_by_pid = {} + for app in result["applications"]: + app_info = app["info"] + app_name_by_pid[app_info["pid"]] = app_info["name"] + + for item in result["menubar_items"]: + print(f" - {item['title']}") + print(f" Bounds: x={item['bounds']['x']}, y={item['bounds']['y']}, width={item['bounds']['width']}, height={item['bounds']['height']}") + + if "app_pid" in item: + app_name = app_name_by_pid.get(item["app_pid"], f"Unknown App (PID: {item['app_pid']})") + print(f" App: {app_name} (PID: {item['app_pid']})") + + if "window_id" in item: + print(f" Window ID: {item['window_id']}") + if "owner" in item: + print(f" Owner: {item['owner']}") + if "layer" in item and "z_index" in item: + print(f" Layer: {item['layer']}, Z-Index: {item['z_index']}") + print("") + + # Print dock items if requested + if args.dock and "dock_items" in result: + print("\nDock items:") + for item in result["dock_items"]: + print(f" - {item['title']} ({item['type']})") + print(f" Description: {item['description']}") + print(f" Bounds: x={item['bounds']['x']}, y={item['bounds']['y']}, width={item['bounds']['width']}, height={item['bounds']['height']}") + print(f" Role: {item['role']}, Subrole: {item['subrole']}") + print(f" Index: {item['index']}") + print("") + + # Save the metadata to a JSON file + metadata_path = os.path.join(output_dir, "metadata.json") + with open(metadata_path, "w") as f: + json.dump(result, f, indent=2) + + print(f"\nMetadata saved to: {metadata_path}") + +if __name__ == "__main__": + asyncio.run(run_capture()) \ No newline at end of file diff --git a/libs/computer-server/computer_server/diorama/macos.py b/libs/computer-server/computer_server/diorama/macos.py new file mode 100644 index 00000000..3d5a6f17 --- /dev/null +++ b/libs/computer-server/computer_server/diorama/macos.py @@ -0,0 +1,29 @@ +import platform +import sys +import platform +import inspect +from .diorama import Diorama +from .base import BaseDioramaHandler + +class MacOSDioramaHandler(BaseDioramaHandler): + """Handler for Diorama commands on macOS, using local diorama module.""" + async def diorama_cmd(self, action: str, arguments: dict = None) -> dict: + if platform.system().lower() != "darwin": + return {"success": False, "error": "Diorama is only supported on macOS."} + try: + app_list = arguments.get("app_list") if arguments else None + if not app_list: + return {"success": False, "error": "Missing 'app_list' in arguments"} + diorama = Diorama(app_list) + interface = diorama.interface + if not hasattr(interface, action): + return {"success": False, "error": f"Unknown diorama action: {action}"} + method = getattr(interface, action) + if inspect.iscoroutinefunction(method): + result = await method(**(arguments or {})) + else: + result = method(**(arguments or {})) + return {"success": True, "result": result} + except Exception as e: + import traceback + return {"success": False, "error": str(e), "trace": traceback.format_exc()} diff --git a/libs/computer-server/computer_server/diorama/safezone.py b/libs/computer-server/computer_server/diorama/safezone.py new file mode 100644 index 00000000..122b668f --- /dev/null +++ b/libs/computer-server/computer_server/diorama/safezone.py @@ -0,0 +1,199 @@ +#!/usr/bin/env python3 +""" +UI Safezone Helper - A utility to get accurate bounds for macOS UI elements + +This module provides helper functions to get accurate bounds for macOS UI elements +like the menubar and dock, which are needed for proper screenshot composition. +""" + +import sys +import time +from typing import Dict, Any, Optional, Tuple + +# Import Objective-C bridge libraries +try: + import AppKit + from ApplicationServices import ( + AXUIElementCreateSystemWide, + AXUIElementCreateApplication, + AXUIElementCopyAttributeValue, + AXUIElementCopyAttributeValues, + kAXChildrenAttribute, + kAXRoleAttribute, + kAXTitleAttribute, + kAXPositionAttribute, + kAXSizeAttribute, + kAXErrorSuccess, + AXValueGetType, + kAXValueCGSizeType, + kAXValueCGPointType, + AXUIElementGetTypeID, + AXValueGetValue, + kAXMenuBarAttribute, + ) + from AppKit import NSWorkspace, NSRunningApplication + import Foundation +except ImportError: + print("Error: This script requires PyObjC to be installed.") + print("Please install it with: pip install pyobjc") + sys.exit(1) + +# Constants for accessibility API +kAXErrorSuccess = 0 +kAXRoleAttribute = "AXRole" +kAXSubroleAttribute = "AXSubrole" +kAXTitleAttribute = "AXTitle" +kAXPositionAttribute = "AXPosition" +kAXSizeAttribute = "AXSize" +kAXChildrenAttribute = "AXChildren" +kAXMenuBarAttribute = "AXMenuBar" + + +def element_attribute(element, attribute): + """Get an attribute from an accessibility element""" + if attribute == kAXChildrenAttribute: + err, value = AXUIElementCopyAttributeValues(element, attribute, 0, 999, None) + if err == kAXErrorSuccess: + if isinstance(value, Foundation.NSArray): + return list(value) + else: + return value + err, value = AXUIElementCopyAttributeValue(element, attribute, None) + if err == kAXErrorSuccess: + return value + return None + + +def element_value(element, type): + """Get a value from an accessibility element""" + err, value = AXValueGetValue(element, type, None) + if err == True: + return value + return None + + +def get_element_bounds(element): + """Get the bounds of an accessibility element""" + bounds = { + "x": 0, + "y": 0, + "width": 0, + "height": 0 + } + + # Get position + position_value = element_attribute(element, kAXPositionAttribute) + if position_value: + position_value = element_value(position_value, kAXValueCGPointType) + if position_value: + bounds["x"] = position_value.x + bounds["y"] = position_value.y + + # Get size + size_value = element_attribute(element, kAXSizeAttribute) + if size_value: + size_value = element_value(size_value, kAXValueCGSizeType) + if size_value: + bounds["width"] = size_value.width + bounds["height"] = size_value.height + + return bounds + + +def find_dock_process(): + """Find the Dock process""" + running_apps = NSWorkspace.sharedWorkspace().runningApplications() + for app in running_apps: + if app.localizedName() == "Dock" and app.bundleIdentifier() == "com.apple.dock": + return app.processIdentifier() + return None + + +def get_menubar_bounds(): + """Get the bounds of the macOS menubar + + Returns: + Dictionary with x, y, width, height of the menubar + """ + # Get the system-wide accessibility element + system_element = AXUIElementCreateSystemWide() + + # Try to find the menubar + menubar = element_attribute(system_element, kAXMenuBarAttribute) + if menubar is None: + # If we can't get it directly, try through the frontmost app + frontmost_app = NSWorkspace.sharedWorkspace().frontmostApplication() + if frontmost_app: + app_pid = frontmost_app.processIdentifier() + app_element = AXUIElementCreateApplication(app_pid) + menubar = element_attribute(app_element, kAXMenuBarAttribute) + + if menubar is None: + print("Error: Could not get menubar") + # Return default menubar bounds as fallback + return {"x": 0, "y": 0, "width": 1800, "height": 24} + + # Get menubar bounds + return get_element_bounds(menubar) + + +def get_dock_bounds(): + """Get the bounds of the macOS Dock + + Returns: + Dictionary with x, y, width, height of the Dock + """ + dock_pid = find_dock_process() + if dock_pid is None: + print("Error: Could not find Dock process") + # Return empty bounds as fallback + return {"x": 0, "y": 0, "width": 0, "height": 0} + + # Create an accessibility element for the Dock + dock_element = AXUIElementCreateApplication(dock_pid) + if dock_element is None: + print(f"Error: Could not create accessibility element for Dock (PID {dock_pid})") + return {"x": 0, "y": 0, "width": 0, "height": 0} + + # Get the Dock's children + children = element_attribute(dock_element, kAXChildrenAttribute) + if not children or len(children) == 0: + print("Error: Could not get Dock children") + return {"x": 0, "y": 0, "width": 0, "height": 0} + + # Find the Dock's list (first child is usually the main dock list) + dock_list = None + for child in children: + role = element_attribute(child, kAXRoleAttribute) + if role == "AXList": + dock_list = child + break + + if dock_list is None: + print("Error: Could not find Dock list") + return {"x": 0, "y": 0, "width": 0, "height": 0} + + # Get the bounds of the dock list + return get_element_bounds(dock_list) + + +def get_ui_element_bounds(): + """Get the bounds of important UI elements like menubar and dock + + Returns: + Dictionary with menubar and dock bounds + """ + menubar_bounds = get_menubar_bounds() + dock_bounds = get_dock_bounds() + + return { + "menubar": menubar_bounds, + "dock": dock_bounds + } + + +if __name__ == "__main__": + # Example usage + bounds = get_ui_element_bounds() + print("Menubar bounds:", bounds["menubar"]) + print("Dock bounds:", bounds["dock"]) diff --git a/libs/computer-server/computer_server/handlers/factory.py b/libs/computer-server/computer_server/handlers/factory.py index 41e781f5..fecd9bbc 100644 --- a/libs/computer-server/computer_server/handlers/factory.py +++ b/libs/computer-server/computer_server/handlers/factory.py @@ -2,11 +2,13 @@ import platform import subprocess from typing import Tuple, Type from .base import BaseAccessibilityHandler, BaseAutomationHandler +from computer_server.diorama.base import BaseDioramaHandler # Conditionally import platform-specific handlers system = platform.system().lower() if system == 'darwin': from .macos import MacOSAccessibilityHandler, MacOSAutomationHandler + from computer_server.diorama.macos import MacOSDioramaHandler elif system == 'linux': from .linux import LinuxAccessibilityHandler, LinuxAutomationHandler @@ -38,13 +40,13 @@ class HandlerFactory: raise RuntimeError(f"Failed to determine current OS: {str(e)}") @staticmethod - def create_handlers() -> Tuple[BaseAccessibilityHandler, BaseAutomationHandler]: + def create_handlers() -> Tuple[BaseAccessibilityHandler, BaseAutomationHandler, BaseDioramaHandler]: """Create and return appropriate handlers for the current OS. Returns: - Tuple[BaseAccessibilityHandler, BaseAutomationHandler]: A tuple containing - the appropriate accessibility and automation handlers for the current OS. - + Tuple[BaseAccessibilityHandler, BaseAutomationHandler, BaseDioramaHandler]: A tuple containing + the appropriate accessibility, automation, and diorama handlers for the current OS. + Raises: NotImplementedError: If the current OS is not supported RuntimeError: If unable to determine the current OS @@ -52,8 +54,8 @@ class HandlerFactory: os_type = HandlerFactory._get_current_os() if os_type == 'darwin': - return MacOSAccessibilityHandler(), MacOSAutomationHandler() - elif os_type == 'linux': - return LinuxAccessibilityHandler(), LinuxAutomationHandler() + return MacOSAccessibilityHandler(), MacOSAutomationHandler(), MacOSDioramaHandler() else: + return LinuxAccessibilityHandler(), LinuxAutomationHandler(), BaseDioramaHandler() + raise NotImplementedError(f"OS '{os_type}' is not supported") \ No newline at end of file diff --git a/libs/computer-server/computer_server/main.py b/libs/computer-server/computer_server/main.py index d7f66f89..b5362e26 100644 --- a/libs/computer-server/computer_server/main.py +++ b/libs/computer-server/computer_server/main.py @@ -31,7 +31,7 @@ class ConnectionManager: def __init__(self): self.active_connections: List[WebSocket] = [] # Create OS-specific handlers - self.accessibility_handler, self.automation_handler = HandlerFactory.create_handlers() + self.accessibility_handler, self.automation_handler, self.diorama_handler = HandlerFactory.create_handlers() async def connect(self, websocket: WebSocket): await websocket.accept() @@ -72,6 +72,7 @@ async def websocket_endpoint(websocket: WebSocket): "copy_to_clipboard": manager.automation_handler.copy_to_clipboard, "set_clipboard": manager.automation_handler.set_clipboard, "run_command": manager.automation_handler.run_command, + "diorama_cmd": manager.diorama_handler.diorama_cmd, } try: diff --git a/libs/computer/computer/computer.py b/libs/computer/computer/computer.py index 0f54e40b..325dc35d 100644 --- a/libs/computer/computer/computer.py +++ b/libs/computer/computer/computer.py @@ -21,6 +21,19 @@ OSType = Literal["macos", "linux", "windows"] class Computer: """Computer is the main class for interacting with the computer.""" + def create_desktop_from_apps(self, apps): + """ + Create a virtual desktop from a list of app names, returning a DioramaComputer + that proxies Diorama.Interface but uses diorama_cmds via the computer interface. + + Args: + apps (list[str]): List of application names to include in the desktop. + Returns: + DioramaComputer: A proxy object with the Diorama interface, but using diorama_cmds. + """ + from .diorama_computer import DioramaComputer + return DioramaComputer(self, apps) + def __init__( self, display: Union[Display, Dict[str, int], str] = "1024x768", diff --git a/libs/computer/computer/diorama_computer.py b/libs/computer/computer/diorama_computer.py new file mode 100644 index 00000000..5e37af45 --- /dev/null +++ b/libs/computer/computer/diorama_computer.py @@ -0,0 +1,89 @@ +import asyncio + +class DioramaComputer: + """ + A Computer-compatible proxy for Diorama that sends commands over the ComputerInterface. + """ + def __init__(self, computer, apps): + self.computer = computer + self.apps = apps + self.interface = DioramaComputerInterface(computer, apps) + self._initialized = False + + async def __aenter__(self): + self._initialized = True + return self + + async def run(self): + if not self._initialized: + await self.__aenter__() + return self + +class DioramaComputerInterface: + """ + Diorama Interface proxy that sends diorama_cmds via the Computer's interface. + """ + def __init__(self, computer, apps): + self.computer = computer + self.apps = apps + self._scene_hitboxes = [] + self._scene_size = None + + async def _send_cmd(self, action, arguments=None): + arguments = arguments or {} + arguments = {"app_list": self.apps, **arguments} + # Use the computer's interface (must be initialized) + iface = getattr(self.computer, "_interface", None) + if iface is None: + raise RuntimeError("Computer interface not initialized. Call run() first.") + result = await iface.diorama_cmd(action, arguments) + if not result.get("success"): + raise RuntimeError(f"Diorama command failed: {result.get('error')}") + return result.get("result") + + async def screenshot(self, as_bytes=True): + from PIL import Image + result = await self._send_cmd("screenshot") + img_bytes = result.get("image_bytes") + hitboxes = result.get("hitboxes", []) + self._scene_hitboxes = hitboxes + # Assume server returns PNG bytes + import io + img = Image.open(io.BytesIO(img_bytes)) + self._scene_size = img.size + return img_bytes if as_bytes else img + + async def get_screen_size(self): + if not self._scene_size: + await self.screenshot(as_bytes=False) + return {"width": self._scene_size[0], "height": self._scene_size[1]} + + async def move_cursor(self, x, y): + await self._send_cmd("move_cursor", {"x": x, "y": y}) + + async def left_click(self, x=None, y=None): + await self._send_cmd("left_click", {"x": x, "y": y}) + + async def right_click(self, x=None, y=None): + await self._send_cmd("right_click", {"x": x, "y": y}) + + async def double_click(self, x=None, y=None): + await self._send_cmd("double_click", {"x": x, "y": y}) + + async def drag_to(self, x, y, duration=0.5): + await self._send_cmd("drag_to", {"x": x, "y": y, "duration": duration}) + + async def get_cursor_position(self): + return await self._send_cmd("get_cursor_position") + + async def type_text(self, text): + await self._send_cmd("type_text", {"text": text}) + + async def press_key(self, key): + await self._send_cmd("press_key", {"key": key}) + + async def hotkey(self, *keys): + await self._send_cmd("hotkey", {"keys": list(keys)}) + + async def to_screen_coordinates(self, x, y): + return await self._send_cmd("to_screen_coordinates", {"x": x, "y": y}) diff --git a/libs/computer/computer/interface/macos.py b/libs/computer/computer/interface/macos.py index 846e1a7e..6c6e5d8d 100644 --- a/libs/computer/computer/interface/macos.py +++ b/libs/computer/computer/interface/macos.py @@ -318,6 +318,10 @@ class MacOSComputerInterface(BaseComputerInterface): asyncio.create_task(self._ws.close()) self._ws = None + async def diorama_cmd(self, action: str, arguments: dict = None) -> dict: + """Send a diorama command to the server (macOS only).""" + return await self._send_command("diorama_cmd", {"action": action, "arguments": arguments or {}}) + # Mouse Actions async def left_click(self, x: Optional[int] = None, y: Optional[int] = None) -> None: await self._send_command("left_click", {"x": x, "y": y})