Merge pull request #280 from trycua/feature/computer/extensions

[Computer] Add App-Use to Computer interface
This commit is contained in:
ddupont
2025-05-31 14:39:40 -04:00
committed by GitHub
14 changed files with 1992 additions and 13 deletions

View File

@@ -0,0 +1,4 @@
class BaseDioramaHandler:
"""Base Diorama handler for unsupported OSes."""
async def diorama_cmd(self, action: str, arguments: dict = None) -> dict:
return {"success": False, "error": "Diorama is not supported on this OS yet."}

View File

@@ -0,0 +1,372 @@
#!/usr/bin/env python3
"""Diorama: A virtual desktop manager for macOS"""
import os
import asyncio
import logging
import sys
import io
from typing import Union
from PIL import Image, ImageDraw
from computer_server.diorama.draw import capture_all_apps, AppActivationContext, get_frontmost_and_active_app, get_all_windows, get_running_apps
from computer_server.diorama.diorama_computer import DioramaComputer
from computer_server.handlers.macos import *
# simple, nicely formatted logging
logging.basicConfig(
level=logging.INFO,
format='[%(asctime)s] [%(levelname)s] %(message)s',
datefmt='%H:%M:%S',
stream=sys.stdout
)
logger = logging.getLogger("diorama.virtual_desktop")
automation_handler = MacOSAutomationHandler()
class Diorama:
_scheduler_queue = None
_scheduler_task = None
_loop = None
_scheduler_started = False
@classmethod
def create_from_apps(cls, *args) -> DioramaComputer:
cls._ensure_scheduler()
return cls(args).computer
def __init__(self, app_list):
self.app_list = app_list
self.interface = self.Interface(self)
self.computer = DioramaComputer(self)
self.focus_context = None
@classmethod
def _ensure_scheduler(cls):
if not cls._scheduler_started:
logger.info("Starting Diorama scheduler loop…")
cls._scheduler_queue = asyncio.Queue()
cls._loop = asyncio.get_event_loop()
cls._scheduler_task = cls._loop.create_task(cls._scheduler_loop())
cls._scheduler_started = True
@classmethod
async def _scheduler_loop(cls):
while True:
cmd = await cls._scheduler_queue.get()
action = cmd.get("action")
args = cmd.get("arguments", {})
future = cmd.get("future")
logger.info(f"Processing command: {action} | args={args}")
app_whitelist = args.get("app_list", [])
all_windows = get_all_windows()
running_apps = get_running_apps()
frontmost_app, active_app_to_use, active_app_pid = get_frontmost_and_active_app(all_windows, running_apps, app_whitelist)
focus_context = AppActivationContext(active_app_pid, active_app_to_use, logger)
with focus_context:
try:
if action == "screenshot":
app_whitelist = list(args["app_list"])
logger.info(f"Taking screenshot for apps: {app_whitelist}")
result, img = capture_all_apps(
app_whitelist=app_whitelist,
save_to_disk=False,
take_focus=False
)
logger.info("Screenshot complete.")
if future:
future.set_result((result, img))
# Mouse actions
elif action in ["left_click", "right_click", "double_click", "move_cursor", "drag_to"]:
x = args.get("x")
y = args.get("y")
duration = args.get("duration", 0.5)
if action == "left_click":
await automation_handler.left_click(x, y)
elif action == "right_click":
await automation_handler.right_click(x, y)
elif action == "double_click":
await automation_handler.double_click(x, y)
elif action == "move_cursor":
await automation_handler.move_cursor(x, y)
elif action == "drag_to":
await automation_handler.drag_to(x, y, duration=duration)
if future:
future.set_result(None)
elif action in ["scroll_up", "scroll_down"]:
clicks = args.get("clicks", 1)
if action == "scroll_up":
await automation_handler.scroll_up(clicks)
else:
await automation_handler.scroll_down(clicks)
if future:
future.set_result(None)
# Keyboard actions
elif action == "type_text":
text = args.get("text")
await automation_handler.type_text(text)
if future:
future.set_result(None)
elif action == "press_key":
key = args.get("key")
await automation_handler.press_key(key)
if future:
future.set_result(None)
elif action == "hotkey":
keys = args.get("keys", [])
await automation_handler.hotkey(keys)
if future:
future.set_result(None)
elif action == "get_cursor_position":
pos = await automation_handler.get_cursor_position()
if future:
future.set_result(pos)
else:
logger.warning(f"Unknown action: {action}")
if future:
future.set_exception(ValueError(f"Unknown action: {action}"))
except Exception as e:
logger.error(f"Exception during {action}: {e}", exc_info=True)
if future:
future.set_exception(e)
class Interface():
def __init__(self, diorama):
self._diorama = diorama
self._scene_hitboxes = []
self._scene_size = None
async def _send_cmd(self, action, arguments=None):
Diorama._ensure_scheduler()
loop = asyncio.get_event_loop()
future = loop.create_future()
logger.info(f"Enqueuing {action} command for apps: {self._diorama.app_list}")
await Diorama._scheduler_queue.put({
"action": action,
"arguments": {"app_list": self._diorama.app_list, **(arguments or {})},
"future": future
})
try:
return await future
except asyncio.CancelledError:
logger.warning(f"Command was cancelled: {action}")
return None
async def screenshot(self, as_bytes: bool = True) -> Union[str, Image.Image]:
import base64
result, img = await self._send_cmd("screenshot")
self._scene_hitboxes = result.get("hitboxes", [])
self._scene_size = img.size
if as_bytes:
# PIL Image to bytes, then base64 encode for JSON
import io
img_byte_arr = io.BytesIO()
img.save(img_byte_arr, format="PNG")
img_bytes = img_byte_arr.getvalue()
img_b64 = base64.b64encode(img_bytes).decode("ascii")
return img_b64
else:
return img
async def left_click(self, x, y):
sx, sy = await self.to_screen_coordinates(x, y)
await self._send_cmd("left_click", {"x": sx, "y": sy})
async def right_click(self, x, y):
sx, sy = await self.to_screen_coordinates(x, y)
await self._send_cmd("right_click", {"x": sx, "y": sy})
async def double_click(self, x, y):
sx, sy = await self.to_screen_coordinates(x, y)
await self._send_cmd("double_click", {"x": sx, "y": sy})
async def move_cursor(self, x, y):
sx, sy = await self.to_screen_coordinates(x, y)
await self._send_cmd("move_cursor", {"x": sx, "y": sy})
async def drag_to(self, x, y, duration=0.5):
sx, sy = await self.to_screen_coordinates(x, y)
await self._send_cmd("drag_to", {"x": sx, "y": sy, "duration": duration})
async def get_cursor_position(self):
return await self._send_cmd("get_cursor_position")
async def type_text(self, text):
await self._send_cmd("type_text", {"text": text})
async def press_key(self, key):
await self._send_cmd("press_key", {"key": key})
async def hotkey(self, keys):
await self._send_cmd("hotkey", {"keys": list(keys)})
async def scroll_up(self, clicks: int = 1):
await self._send_cmd("scroll_up", {"clicks": clicks})
async def scroll_down(self, clicks: int = 1):
await self._send_cmd("scroll_down", {"clicks": clicks})
async def get_screen_size(self) -> dict[str, int]:
if not self._scene_size:
await self.screenshot()
return { "width": self._scene_size[0], "height": self._scene_size[1] }
async def to_screen_coordinates(self, x: float, y: float) -> tuple[float, float]:
"""Convert screenshot coordinates to screen coordinates.
Args:
x: X absolute coordinate in screenshot space
y: Y absolute coordinate in screenshot space
Returns:
tuple[float, float]: (x, y) absolute coordinates in screen space
"""
if not self._scene_hitboxes:
await self.screenshot() # get hitboxes
# Try all hitboxes
for h in self._scene_hitboxes[::-1]:
rect_from = h.get("hitbox")
rect_to = h.get("target")
if not rect_from or len(rect_from) != 4:
continue
# check if (x, y) is inside rect_from
x0, y0, x1, y1 = rect_from
if x0 <= x <= x1 and y0 <= y <= y1:
logger.info(f"Found hitbox: {h}")
# remap (x, y) to rect_to
tx0, ty0, tx1, ty1 = rect_to
# calculate offset from x0, y0
offset_x = x - x0
offset_y = y - y0
# remap offset to rect_to
tx = tx0 + offset_x
ty = ty0 + offset_y
return tx, ty
return x, y
async def to_screenshot_coordinates(self, x: float, y: float) -> tuple[float, float]:
"""Convert screen coordinates to screenshot coordinates.
Args:
x: X absolute coordinate in screen space
y: Y absolute coordinate in screen space
Returns:
tuple[float, float]: (x, y) absolute coordinates in screenshot space
"""
if not self._scene_hitboxes:
await self.screenshot() # get hitboxes
# Try all hitboxes
for h in self._scene_hitboxes[::-1]:
rect_from = h.get("target")
rect_to = h.get("hitbox")
if not rect_from or len(rect_from) != 4:
continue
# check if (x, y) is inside rect_from
x0, y0, x1, y1 = rect_from
if x0 <= x <= x1 and y0 <= y <= y1:
# remap (x, y) to rect_to
tx0, ty0, tx1, ty1 = rect_to
# calculate offset from x0, y0
offset_x = x - x0
offset_y = y - y0
# remap offset to rect_to
tx = tx0 + offset_x
ty = ty0 + offset_y
return tx, ty
return x, y
import pyautogui
import time
async def main():
desktop1 = Diorama.create_from_apps(["Discord", "Notes"])
desktop2 = Diorama.create_from_apps(["Terminal"])
img1 = await desktop1.interface.screenshot(as_bytes=False)
img2 = await desktop2.interface.screenshot(as_bytes=False)
img1.save("app_screenshots/desktop1.png")
img2.save("app_screenshots/desktop2.png")
# Initialize Diorama desktop
desktop3 = Diorama.create_from_apps("Safari")
screen_size = await desktop3.interface.get_screen_size()
print(screen_size)
# Take initial screenshot
img = await desktop3.interface.screenshot(as_bytes=False)
img.save("app_screenshots/desktop3.png")
# Prepare hitboxes and draw on the single screenshot
hitboxes = desktop3.interface._scene_hitboxes[::-1]
base_img = img.copy()
draw = ImageDraw.Draw(base_img)
for h in hitboxes:
rect = h.get("hitbox")
if not rect or len(rect) != 4:
continue
draw.rectangle(rect, outline="red", width=2)
# Track and draw mouse position in real time (single screenshot size)
last_mouse_pos = None
print("Tracking mouse... Press Ctrl+C to stop.")
try:
while True:
mouse_x, mouse_y = pyautogui.position()
if last_mouse_pos != (mouse_x, mouse_y):
last_mouse_pos = (mouse_x, mouse_y)
# Map to screenshot coordinates
sx, sy = await desktop3.interface.to_screenshot_coordinates(mouse_x, mouse_y)
# Draw on a copy of the screenshot
frame = base_img.copy()
frame_draw = ImageDraw.Draw(frame)
frame_draw.ellipse((sx-5, sy-5, sx+5, sy+5), fill="blue", outline="blue")
# Save the frame
frame.save("app_screenshots/desktop3_mouse.png")
print(f"Mouse at screen ({mouse_x}, {mouse_y}) -> screenshot ({sx:.1f}, {sy:.1f})")
time.sleep(0.05) # Throttle updates to ~20 FPS
except KeyboardInterrupt:
print("Stopped tracking.")
draw.text((rect[0], rect[1]), str(idx), fill="red")
canvas.save("app_screenshots/desktop3_hitboxes.png")
# move mouse in a square spiral around the screen
import math
import random
step = 20 # pixels per move
dot_radius = 10
width = screen_size["width"]
height = screen_size["height"]
x, y = 0, 10
while x < width and y < height:
await desktop3.interface.move_cursor(x, y)
img = await desktop3.interface.screenshot(as_bytes=False)
draw = ImageDraw.Draw(img)
draw.ellipse((x-dot_radius, y-dot_radius, x+dot_radius, y+dot_radius), fill="red")
img.save("current.png")
await asyncio.sleep(0.03)
x += step
y = math.sin(x / width * math.pi * 2) * 50 + 25
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,26 @@
import asyncio
class DioramaComputer:
"""
A minimal Computer-like interface for Diorama, compatible with ComputerAgent.
Implements _initialized, run(), and __aenter__ for agent compatibility.
"""
def __init__(self, diorama):
self.diorama = diorama
self.interface = self.diorama.interface
self._initialized = False
async def __aenter__(self):
# Ensure the event loop is running (for compatibility)
try:
asyncio.get_running_loop()
except RuntimeError:
asyncio.set_event_loop(asyncio.new_event_loop())
self._initialized = True
return self
async def run(self):
# This is a stub for compatibility
if not self._initialized:
await self.__aenter__()
return self

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,33 @@
import platform
import sys
import platform
import inspect
from computer_server.diorama.diorama import Diorama
from computer_server.diorama.base import BaseDioramaHandler
from typing import Optional
class MacOSDioramaHandler(BaseDioramaHandler):
"""Handler for Diorama commands on macOS, using local diorama module."""
async def diorama_cmd(self, action: str, arguments: Optional[dict] = None) -> dict:
if platform.system().lower() != "darwin":
return {"success": False, "error": "Diorama is only supported on macOS."}
try:
app_list = arguments.get("app_list") if arguments else None
if not app_list:
return {"success": False, "error": "Missing 'app_list' in arguments"}
diorama = Diorama(app_list)
interface = diorama.interface
if not hasattr(interface, action):
return {"success": False, "error": f"Unknown diorama action: {action}"}
method = getattr(interface, action)
# Remove app_list from arguments before calling the method
filtered_arguments = dict(arguments)
filtered_arguments.pop("app_list", None)
if inspect.iscoroutinefunction(method):
result = await method(**(filtered_arguments or {}))
else:
result = method(**(filtered_arguments or {}))
return {"success": True, "result": result}
except Exception as e:
import traceback
return {"success": False, "error": str(e), "trace": traceback.format_exc()}

View File

@@ -0,0 +1,199 @@
#!/usr/bin/env python3
"""
UI Safezone Helper - A utility to get accurate bounds for macOS UI elements
This module provides helper functions to get accurate bounds for macOS UI elements
like the menubar and dock, which are needed for proper screenshot composition.
"""
import sys
import time
from typing import Dict, Any, Optional, Tuple
# Import Objective-C bridge libraries
try:
import AppKit
from ApplicationServices import (
AXUIElementCreateSystemWide,
AXUIElementCreateApplication,
AXUIElementCopyAttributeValue,
AXUIElementCopyAttributeValues,
kAXChildrenAttribute,
kAXRoleAttribute,
kAXTitleAttribute,
kAXPositionAttribute,
kAXSizeAttribute,
kAXErrorSuccess,
AXValueGetType,
kAXValueCGSizeType,
kAXValueCGPointType,
AXUIElementGetTypeID,
AXValueGetValue,
kAXMenuBarAttribute,
)
from AppKit import NSWorkspace, NSRunningApplication
import Foundation
except ImportError:
print("Error: This script requires PyObjC to be installed.")
print("Please install it with: pip install pyobjc")
sys.exit(1)
# Constants for accessibility API
kAXErrorSuccess = 0
kAXRoleAttribute = "AXRole"
kAXSubroleAttribute = "AXSubrole"
kAXTitleAttribute = "AXTitle"
kAXPositionAttribute = "AXPosition"
kAXSizeAttribute = "AXSize"
kAXChildrenAttribute = "AXChildren"
kAXMenuBarAttribute = "AXMenuBar"
def element_attribute(element, attribute):
"""Get an attribute from an accessibility element"""
if attribute == kAXChildrenAttribute:
err, value = AXUIElementCopyAttributeValues(element, attribute, 0, 999, None)
if err == kAXErrorSuccess:
if isinstance(value, Foundation.NSArray):
return list(value)
else:
return value
err, value = AXUIElementCopyAttributeValue(element, attribute, None)
if err == kAXErrorSuccess:
return value
return None
def element_value(element, type):
"""Get a value from an accessibility element"""
err, value = AXValueGetValue(element, type, None)
if err == True:
return value
return None
def get_element_bounds(element):
"""Get the bounds of an accessibility element"""
bounds = {
"x": 0,
"y": 0,
"width": 0,
"height": 0
}
# Get position
position_value = element_attribute(element, kAXPositionAttribute)
if position_value:
position_value = element_value(position_value, kAXValueCGPointType)
if position_value:
bounds["x"] = position_value.x
bounds["y"] = position_value.y
# Get size
size_value = element_attribute(element, kAXSizeAttribute)
if size_value:
size_value = element_value(size_value, kAXValueCGSizeType)
if size_value:
bounds["width"] = size_value.width
bounds["height"] = size_value.height
return bounds
def find_dock_process():
"""Find the Dock process"""
running_apps = NSWorkspace.sharedWorkspace().runningApplications()
for app in running_apps:
if app.localizedName() == "Dock" and app.bundleIdentifier() == "com.apple.dock":
return app.processIdentifier()
return None
def get_menubar_bounds():
"""Get the bounds of the macOS menubar
Returns:
Dictionary with x, y, width, height of the menubar
"""
# Get the system-wide accessibility element
system_element = AXUIElementCreateSystemWide()
# Try to find the menubar
menubar = element_attribute(system_element, kAXMenuBarAttribute)
if menubar is None:
# If we can't get it directly, try through the frontmost app
frontmost_app = NSWorkspace.sharedWorkspace().frontmostApplication()
if frontmost_app:
app_pid = frontmost_app.processIdentifier()
app_element = AXUIElementCreateApplication(app_pid)
menubar = element_attribute(app_element, kAXMenuBarAttribute)
if menubar is None:
print("Error: Could not get menubar")
# Return default menubar bounds as fallback
return {"x": 0, "y": 0, "width": 1800, "height": 24}
# Get menubar bounds
return get_element_bounds(menubar)
def get_dock_bounds():
"""Get the bounds of the macOS Dock
Returns:
Dictionary with x, y, width, height of the Dock
"""
dock_pid = find_dock_process()
if dock_pid is None:
print("Error: Could not find Dock process")
# Return empty bounds as fallback
return {"x": 0, "y": 0, "width": 0, "height": 0}
# Create an accessibility element for the Dock
dock_element = AXUIElementCreateApplication(dock_pid)
if dock_element is None:
print(f"Error: Could not create accessibility element for Dock (PID {dock_pid})")
return {"x": 0, "y": 0, "width": 0, "height": 0}
# Get the Dock's children
children = element_attribute(dock_element, kAXChildrenAttribute)
if not children or len(children) == 0:
print("Error: Could not get Dock children")
return {"x": 0, "y": 0, "width": 0, "height": 0}
# Find the Dock's list (first child is usually the main dock list)
dock_list = None
for child in children:
role = element_attribute(child, kAXRoleAttribute)
if role == "AXList":
dock_list = child
break
if dock_list is None:
print("Error: Could not find Dock list")
return {"x": 0, "y": 0, "width": 0, "height": 0}
# Get the bounds of the dock list
return get_element_bounds(dock_list)
def get_ui_element_bounds():
"""Get the bounds of important UI elements like menubar and dock
Returns:
Dictionary with menubar and dock bounds
"""
menubar_bounds = get_menubar_bounds()
dock_bounds = get_dock_bounds()
return {
"menubar": menubar_bounds,
"dock": dock_bounds
}
if __name__ == "__main__":
# Example usage
bounds = get_ui_element_bounds()
print("Menubar bounds:", bounds["menubar"])
print("Dock bounds:", bounds["dock"])

View File

@@ -2,11 +2,13 @@ import platform
import subprocess
from typing import Tuple, Type
from .base import BaseAccessibilityHandler, BaseAutomationHandler
from computer_server.diorama.base import BaseDioramaHandler
# Conditionally import platform-specific handlers
system = platform.system().lower()
if system == 'darwin':
from .macos import MacOSAccessibilityHandler, MacOSAutomationHandler
from computer_server.diorama.macos import MacOSDioramaHandler
elif system == 'linux':
from .linux import LinuxAccessibilityHandler, LinuxAutomationHandler
@@ -38,13 +40,13 @@ class HandlerFactory:
raise RuntimeError(f"Failed to determine current OS: {str(e)}")
@staticmethod
def create_handlers() -> Tuple[BaseAccessibilityHandler, BaseAutomationHandler]:
def create_handlers() -> Tuple[BaseAccessibilityHandler, BaseAutomationHandler, BaseDioramaHandler]:
"""Create and return appropriate handlers for the current OS.
Returns:
Tuple[BaseAccessibilityHandler, BaseAutomationHandler]: A tuple containing
the appropriate accessibility and automation handlers for the current OS.
Tuple[BaseAccessibilityHandler, BaseAutomationHandler, BaseDioramaHandler]: A tuple containing
the appropriate accessibility, automation, and diorama handlers for the current OS.
Raises:
NotImplementedError: If the current OS is not supported
RuntimeError: If unable to determine the current OS
@@ -52,8 +54,8 @@ class HandlerFactory:
os_type = HandlerFactory._get_current_os()
if os_type == 'darwin':
return MacOSAccessibilityHandler(), MacOSAutomationHandler()
return MacOSAccessibilityHandler(), MacOSAutomationHandler(), MacOSDioramaHandler()
elif os_type == 'linux':
return LinuxAccessibilityHandler(), LinuxAutomationHandler()
return LinuxAccessibilityHandler(), LinuxAutomationHandler(), BaseDioramaHandler()
else:
raise NotImplementedError(f"OS '{os_type}' is not supported")

View File

@@ -33,12 +33,19 @@ from ApplicationServices import (
AXValueGetValue, # type: ignore
kAXVisibleChildrenAttribute, # type: ignore
kAXRoleDescriptionAttribute, # type: ignore
kAXFocusedApplicationAttribute, # type: ignore
kAXFocusedUIElementAttribute, # type: ignore
kAXSelectedTextAttribute, # type: ignore
kAXSelectedTextRangeAttribute, # type: ignore
)
import objc
import re
import json
import copy
from .base import BaseAccessibilityHandler, BaseAutomationHandler
import logging
logger = logging.getLogger(__name__)
def CFAttributeToPyObject(attrValue):
@@ -317,7 +324,7 @@ class UIElement:
size = f"{self.size.width:.0f};{self.size.height:.0f}"
else:
size = ""
return {
"id": self.identifier,
"name": self.name,
@@ -329,6 +336,7 @@ class UIElement:
"position": position,
"size": size,
"enabled": self.enabled,
"focused": self.focused,
"bbox": self.bbox,
"visible_bbox": self.visible_bbox,
"children": children_to_dict(self.children),
@@ -444,7 +452,9 @@ class MacOSAccessibilityHandler(BaseAccessibilityHandler):
try:
window_element = UIElement(window)
window_trees.append(window_element.to_dict())
except:
except Exception as e:
logger.error(f"Failed to process window {window}: {e}")
window_trees.append({"error": str(e)})
continue
processed_windows.append(
@@ -515,7 +525,6 @@ class MacOSAccessibilityHandler(BaseAccessibilityHandler):
except Exception as e:
return {"success": False, "error": str(e)}
class MacOSAutomationHandler(BaseAutomationHandler):
# Mouse Actions
async def left_click(self, x: Optional[int] = None, y: Optional[int] = None) -> Dict[str, Any]:

View File

@@ -31,7 +31,7 @@ class ConnectionManager:
def __init__(self):
self.active_connections: List[WebSocket] = []
# Create OS-specific handlers
self.accessibility_handler, self.automation_handler = HandlerFactory.create_handlers()
self.accessibility_handler, self.automation_handler, self.diorama_handler = HandlerFactory.create_handlers()
async def connect(self, websocket: WebSocket):
await websocket.accept()
@@ -178,6 +178,7 @@ async def websocket_endpoint(websocket: WebSocket):
"copy_to_clipboard": manager.automation_handler.copy_to_clipboard,
"set_clipboard": manager.automation_handler.set_clipboard,
"run_command": manager.automation_handler.run_command,
"diorama_cmd": manager.diorama_handler.diorama_cmd,
}
try:

View File

@@ -21,6 +21,20 @@ OSType = Literal["macos", "linux", "windows"]
class Computer:
"""Computer is the main class for interacting with the computer."""
def create_desktop_from_apps(self, apps):
"""
Create a virtual desktop from a list of app names, returning a DioramaComputer
that proxies Diorama.Interface but uses diorama_cmds via the computer interface.
Args:
apps (list[str]): List of application names to include in the desktop.
Returns:
DioramaComputer: A proxy object with the Diorama interface, but using diorama_cmds.
"""
assert "app-use" in self.experiments, "App Usage is an experimental feature. Enable it by passing experiments=['app-use'] to Computer()"
from .diorama_computer import DioramaComputer
return DioramaComputer(self, apps)
def __init__(
self,
display: Union[Display, Dict[str, int], str] = "1024x768",
@@ -39,7 +53,8 @@ class Computer:
host: str = os.environ.get("PYLUME_HOST", "localhost"),
storage: Optional[str] = None,
ephemeral: bool = False,
api_key: Optional[str] = None
api_key: Optional[str] = None,
experiments: Optional[List[str]] = None
):
"""Initialize a new Computer instance.
@@ -65,6 +80,8 @@ class Computer:
host: Host to use for VM provider connections (e.g. "localhost", "host.docker.internal")
storage: Optional path for persistent VM storage (Lumier provider)
ephemeral: Whether to use ephemeral storage
api_key: Optional API key for cloud providers
experiments: Optional list of experimental features to enable (e.g. ["app-use"])
"""
self.logger = Logger("cua.computer", verbosity)
@@ -80,6 +97,10 @@ class Computer:
self.ephemeral = ephemeral
self.api_key = api_key
self.experiments = experiments or []
if "app-use" in self.experiments:
assert self.os_type == "macos", "App use experiment is only supported on macOS"
# The default is currently to use non-ephemeral storage
if storage and ephemeral and storage != "ephemeral":

View File

@@ -0,0 +1,93 @@
import asyncio
class DioramaComputer:
"""
A Computer-compatible proxy for Diorama that sends commands over the ComputerInterface.
"""
def __init__(self, computer, apps):
self.computer = computer
self.apps = apps
self.interface = DioramaComputerInterface(computer, apps)
self._initialized = False
async def __aenter__(self):
self._initialized = True
return self
async def run(self):
if not self._initialized:
await self.__aenter__()
return self
class DioramaComputerInterface:
"""
Diorama Interface proxy that sends diorama_cmds via the Computer's interface.
"""
def __init__(self, computer, apps):
self.computer = computer
self.apps = apps
self._scene_size = None
async def _send_cmd(self, action, arguments=None):
arguments = arguments or {}
arguments = {"app_list": self.apps, **arguments}
# Use the computer's interface (must be initialized)
iface = getattr(self.computer, "_interface", None)
if iface is None:
raise RuntimeError("Computer interface not initialized. Call run() first.")
result = await iface.diorama_cmd(action, arguments)
if not result.get("success"):
raise RuntimeError(f"Diorama command failed: {result.get('error')}")
return result.get("result")
async def screenshot(self, as_bytes=True):
from PIL import Image
import base64
result = await self._send_cmd("screenshot")
# assume result is a b64 string of an image
img_bytes = base64.b64decode(result)
import io
img = Image.open(io.BytesIO(img_bytes))
self._scene_size = img.size
return img_bytes if as_bytes else img
async def get_screen_size(self):
if not self._scene_size:
await self.screenshot(as_bytes=False)
return {"width": self._scene_size[0], "height": self._scene_size[1]}
async def move_cursor(self, x, y):
await self._send_cmd("move_cursor", {"x": x, "y": y})
async def left_click(self, x=None, y=None):
await self._send_cmd("left_click", {"x": x, "y": y})
async def right_click(self, x=None, y=None):
await self._send_cmd("right_click", {"x": x, "y": y})
async def double_click(self, x=None, y=None):
await self._send_cmd("double_click", {"x": x, "y": y})
async def scroll_up(self, clicks=1):
await self._send_cmd("scroll_up", {"clicks": clicks})
async def scroll_down(self, clicks=1):
await self._send_cmd("scroll_down", {"clicks": clicks})
async def drag_to(self, x, y, duration=0.5):
await self._send_cmd("drag_to", {"x": x, "y": y, "duration": duration})
async def get_cursor_position(self):
return await self._send_cmd("get_cursor_position")
async def type_text(self, text):
await self._send_cmd("type_text", {"text": text})
async def press_key(self, key):
await self._send_cmd("press_key", {"key": key})
async def hotkey(self, *keys):
await self._send_cmd("hotkey", {"keys": list(keys)})
async def to_screen_coordinates(self, x, y):
return await self._send_cmd("to_screen_coordinates", {"x": x, "y": y})

View File

@@ -177,7 +177,7 @@ class BaseComputerInterface(ABC):
async def get_accessibility_tree(self) -> Dict:
"""Get the accessibility tree of the current screen."""
pass
@abstractmethod
async def to_screen_coordinates(self, x: float, y: float) -> tuple[float, float]:
"""Convert screenshot coordinates to screen coordinates.

View File

@@ -346,6 +346,10 @@ class MacOSComputerInterface(BaseComputerInterface):
asyncio.create_task(self._ws.close())
self._ws = None
async def diorama_cmd(self, action: str, arguments: Optional[dict] = None) -> dict:
"""Send a diorama command to the server (macOS only)."""
return await self._send_command("diorama_cmd", {"action": action, "arguments": arguments or {}})
# Mouse Actions
async def left_click(self, x: Optional[int] = None, y: Optional[int] = None) -> None:
await self._send_command("left_click", {"x": x, "y": y})
@@ -568,7 +572,7 @@ class MacOSComputerInterface(BaseComputerInterface):
if not result.get("success", False):
raise RuntimeError(result.get("error", "Failed to get accessibility tree"))
return result
async def get_active_window_bounds(self) -> Dict[str, int]:
"""Get the bounds of the currently active window."""
result = await self._send_command("get_active_window_bounds")