Merge pull request #390 from onel/reference-docs-20250901_145129

Reference documentation batch
This commit is contained in:
ddupont
2025-09-05 11:17:35 -04:00
committed by GitHub
5 changed files with 829 additions and 27 deletions

View File

@@ -20,6 +20,12 @@ logger = logging.getLogger(__name__)
automation_handler = MacOSAutomationHandler()
class Diorama:
"""Virtual desktop manager that provides automation capabilities for macOS applications.
Manages application windows and provides an interface for taking screenshots,
mouse interactions, keyboard input, and coordinate transformations between
screenshot space and screen space.
"""
_scheduler_queue = None
_scheduler_task = None
_loop = None
@@ -27,6 +33,14 @@ class Diorama:
@classmethod
def create_from_apps(cls, *args) -> DioramaComputer:
"""Create a DioramaComputer instance from a list of application names.
Args:
*args: Variable number of application names to include in the desktop
Returns:
DioramaComputer: A computer interface for the specified applications
"""
cls._ensure_scheduler()
return cls(args).computer
@@ -34,6 +48,11 @@ class Diorama:
_cursor_positions = {}
def __init__(self, app_list):
"""Initialize a Diorama instance for the specified applications.
Args:
app_list: List of application names to manage
"""
self.app_list = app_list
self.interface = self.Interface(self)
self.computer = DioramaComputer(self)
@@ -48,6 +67,10 @@ class Diorama:
@classmethod
def _ensure_scheduler(cls):
"""Ensure the async scheduler loop is running.
Creates and starts the scheduler task if it hasn't been started yet.
"""
if not cls._scheduler_started:
logger.info("Starting Diorama scheduler loop…")
cls._scheduler_queue = asyncio.Queue()
@@ -57,6 +80,11 @@ class Diorama:
@classmethod
async def _scheduler_loop(cls):
"""Main scheduler loop that processes automation commands.
Continuously processes commands from the scheduler queue, handling
screenshots, mouse actions, keyboard input, and scrolling operations.
"""
while True:
cmd = await cls._scheduler_queue.get()
action = cmd.get("action")
@@ -144,13 +172,33 @@ class Diorama:
future.set_exception(e)
class Interface():
"""Interface for interacting with the virtual desktop.
Provides methods for taking screenshots, mouse interactions, keyboard input,
and coordinate transformations between screenshot and screen coordinates.
"""
def __init__(self, diorama):
"""Initialize the interface with a reference to the parent Diorama instance.
Args:
diorama: The parent Diorama instance
"""
self._diorama = diorama
self._scene_hitboxes = []
self._scene_size = None
async def _send_cmd(self, action, arguments=None):
"""Send a command to the scheduler queue.
Args:
action (str): The action to perform
arguments (dict, optional): Arguments for the action
Returns:
The result of the command execution
"""
Diorama._ensure_scheduler()
loop = asyncio.get_event_loop()
future = loop.create_future()
@@ -167,6 +215,14 @@ class Diorama:
return None
async def screenshot(self, as_bytes: bool = True) -> Union[str, Image.Image]:
"""Take a screenshot of the managed applications.
Args:
as_bytes (bool): If True, return base64-encoded bytes; if False, return PIL Image
Returns:
Union[str, Image.Image]: Base64-encoded PNG bytes or PIL Image object
"""
import base64
result, img = await self._send_cmd("screenshot")
self._scene_hitboxes = result.get("hitboxes", [])
@@ -184,6 +240,12 @@ class Diorama:
return img
async def left_click(self, x, y):
"""Perform a left mouse click at the specified coordinates.
Args:
x (int): X coordinate in screenshot space (or None to use last position)
y (int): Y coordinate in screenshot space (or None to use last position)
"""
# Get last cursor position for this app_list hash
app_list_hash = hash(tuple(sorted(self._diorama.app_list)))
last_pos = Diorama._cursor_positions.get(app_list_hash, (0, 0))
@@ -195,6 +257,12 @@ class Diorama:
await self._send_cmd("left_click", {"x": sx, "y": sy})
async def right_click(self, x, y):
"""Perform a right mouse click at the specified coordinates.
Args:
x (int): X coordinate in screenshot space (or None to use last position)
y (int): Y coordinate in screenshot space (or None to use last position)
"""
# Get last cursor position for this app_list hash
app_list_hash = hash(tuple(sorted(self._diorama.app_list)))
last_pos = Diorama._cursor_positions.get(app_list_hash, (0, 0))
@@ -206,6 +274,12 @@ class Diorama:
await self._send_cmd("right_click", {"x": sx, "y": sy})
async def double_click(self, x, y):
"""Perform a double mouse click at the specified coordinates.
Args:
x (int): X coordinate in screenshot space (or None to use last position)
y (int): Y coordinate in screenshot space (or None to use last position)
"""
# Get last cursor position for this app_list hash
app_list_hash = hash(tuple(sorted(self._diorama.app_list)))
last_pos = Diorama._cursor_positions.get(app_list_hash, (0, 0))
@@ -217,6 +291,12 @@ class Diorama:
await self._send_cmd("double_click", {"x": sx, "y": sy})
async def move_cursor(self, x, y):
"""Move the mouse cursor to the specified coordinates.
Args:
x (int): X coordinate in screenshot space (or None to use last position)
y (int): Y coordinate in screenshot space (or None to use last position)
"""
# Get last cursor position for this app_list hash
app_list_hash = hash(tuple(sorted(self._diorama.app_list)))
last_pos = Diorama._cursor_positions.get(app_list_hash, (0, 0))
@@ -228,6 +308,13 @@ class Diorama:
await self._send_cmd("move_cursor", {"x": sx, "y": sy})
async def drag_to(self, x, y, duration=0.5):
"""Drag the mouse from current position to the specified coordinates.
Args:
x (int): X coordinate in screenshot space (or None to use last position)
y (int): Y coordinate in screenshot space (or None to use last position)
duration (float): Duration of the drag operation in seconds
"""
# Get last cursor position for this app_list hash
app_list_hash = hash(tuple(sorted(self._diorama.app_list)))
last_pos = Diorama._cursor_positions.get(app_list_hash, (0, 0))
@@ -239,18 +326,43 @@ class Diorama:
await self._send_cmd("drag_to", {"x": sx, "y": sy, "duration": duration})
async def get_cursor_position(self):
"""Get the current cursor position in screen coordinates.
Returns:
tuple: (x, y) coordinates of the cursor in screen space
"""
return await self._send_cmd("get_cursor_position")
async def type_text(self, text):
"""Type the specified text using the keyboard.
Args:
text (str): The text to type
"""
await self._send_cmd("type_text", {"text": text})
async def press_key(self, key):
"""Press a single key on the keyboard.
Args:
key (str): The key to press
"""
await self._send_cmd("press_key", {"key": key})
async def hotkey(self, keys):
"""Press a combination of keys simultaneously.
Args:
keys (list): List of keys to press together
"""
await self._send_cmd("hotkey", {"keys": list(keys)})
async def scroll_up(self, clicks: int = 1):
"""Scroll up at the current cursor position.
Args:
clicks (int): Number of scroll clicks to perform
"""
# Get last cursor position for this app_list hash
app_list_hash = hash(tuple(sorted(self._diorama.app_list)))
last_pos = Diorama._cursor_positions.get(app_list_hash, (0, 0))
@@ -259,6 +371,11 @@ class Diorama:
await self._send_cmd("scroll_up", {"clicks": clicks, "x": x, "y": y})
async def scroll_down(self, clicks: int = 1):
"""Scroll down at the current cursor position.
Args:
clicks (int): Number of scroll clicks to perform
"""
# Get last cursor position for this app_list hash
app_list_hash = hash(tuple(sorted(self._diorama.app_list)))
last_pos = Diorama._cursor_positions.get(app_list_hash, (0, 0))
@@ -267,6 +384,11 @@ class Diorama:
await self._send_cmd("scroll_down", {"clicks": clicks, "x": x, "y": y})
async def get_screen_size(self) -> dict[str, int]:
"""Get the size of the screenshot area.
Returns:
dict[str, int]: Dictionary with 'width' and 'height' keys
"""
if not self._scene_size:
await self.screenshot()
return { "width": self._scene_size[0], "height": self._scene_size[1] }
@@ -348,6 +470,7 @@ import pyautogui
import time
async def main():
"""Main function demonstrating Diorama usage with multiple desktops and mouse tracking."""
desktop1 = Diorama.create_from_apps(["Discord", "Notes"])
desktop2 = Diorama.create_from_apps(["Terminal"])

View File

@@ -12,35 +12,96 @@ from .base import BaseFileHandler
import base64
def resolve_path(path: str) -> Path:
"""Resolve a path to its absolute path. Expand ~ to the user's home directory."""
"""Resolve a path to its absolute path. Expand ~ to the user's home directory.
Args:
path: The file or directory path to resolve
Returns:
Path: The resolved absolute path
"""
return Path(path).expanduser().resolve()
class GenericFileHandler(BaseFileHandler):
"""
Generic file handler that provides file system operations for all operating systems.
This class implements the BaseFileHandler interface and provides methods for
file and directory operations including reading, writing, creating, and deleting
files and directories.
"""
async def file_exists(self, path: str) -> Dict[str, Any]:
"""
Check if a file exists at the specified path.
Args:
path: The file path to check
Returns:
Dict containing 'success' boolean and either 'exists' boolean or 'error' string
"""
try:
return {"success": True, "exists": resolve_path(path).is_file()}
except Exception as e:
return {"success": False, "error": str(e)}
async def directory_exists(self, path: str) -> Dict[str, Any]:
"""
Check if a directory exists at the specified path.
Args:
path: The directory path to check
Returns:
Dict containing 'success' boolean and either 'exists' boolean or 'error' string
"""
try:
return {"success": True, "exists": resolve_path(path).is_dir()}
except Exception as e:
return {"success": False, "error": str(e)}
async def list_dir(self, path: str) -> Dict[str, Any]:
"""
List all files and directories in the specified directory.
Args:
path: The directory path to list
Returns:
Dict containing 'success' boolean and either 'files' list of names or 'error' string
"""
try:
return {"success": True, "files": [p.name for p in resolve_path(path).iterdir() if p.is_file() or p.is_dir()]}
except Exception as e:
return {"success": False, "error": str(e)}
async def read_text(self, path: str) -> Dict[str, Any]:
"""
Read the contents of a text file.
Args:
path: The file path to read from
Returns:
Dict containing 'success' boolean and either 'content' string or 'error' string
"""
try:
return {"success": True, "content": resolve_path(path).read_text()}
except Exception as e:
return {"success": False, "error": str(e)}
async def write_text(self, path: str, content: str) -> Dict[str, Any]:
"""
Write text content to a file.
Args:
path: The file path to write to
content: The text content to write
Returns:
Dict containing 'success' boolean and optionally 'error' string
"""
try:
resolve_path(path).write_text(content)
return {"success": True}
@@ -48,6 +109,17 @@ class GenericFileHandler(BaseFileHandler):
return {"success": False, "error": str(e)}
async def write_bytes(self, path: str, content_b64: str, append: bool = False) -> Dict[str, Any]:
"""
Write binary content to a file from base64 encoded string.
Args:
path: The file path to write to
content_b64: Base64 encoded binary content
append: If True, append to existing file; if False, overwrite
Returns:
Dict containing 'success' boolean and optionally 'error' string
"""
try:
mode = 'ab' if append else 'wb'
with open(resolve_path(path), mode) as f:
@@ -57,6 +129,17 @@ class GenericFileHandler(BaseFileHandler):
return {"success": False, "error": str(e)}
async def read_bytes(self, path: str, offset: int = 0, length: Optional[int] = None) -> Dict[str, Any]:
"""
Read binary content from a file and return as base64 encoded string.
Args:
path: The file path to read from
offset: Byte offset to start reading from
length: Number of bytes to read; if None, read entire file from offset
Returns:
Dict containing 'success' boolean and either 'content_b64' string or 'error' string
"""
try:
file_path = resolve_path(path)
with open(file_path, 'rb') as f:
@@ -73,6 +156,15 @@ class GenericFileHandler(BaseFileHandler):
return {"success": False, "error": str(e)}
async def get_file_size(self, path: str) -> Dict[str, Any]:
"""
Get the size of a file in bytes.
Args:
path: The file path to get size for
Returns:
Dict containing 'success' boolean and either 'size' integer or 'error' string
"""
try:
file_path = resolve_path(path)
size = file_path.stat().st_size
@@ -81,6 +173,15 @@ class GenericFileHandler(BaseFileHandler):
return {"success": False, "error": str(e)}
async def delete_file(self, path: str) -> Dict[str, Any]:
"""
Delete a file at the specified path.
Args:
path: The file path to delete
Returns:
Dict containing 'success' boolean and optionally 'error' string
"""
try:
resolve_path(path).unlink()
return {"success": True}
@@ -88,6 +189,18 @@ class GenericFileHandler(BaseFileHandler):
return {"success": False, "error": str(e)}
async def create_dir(self, path: str) -> Dict[str, Any]:
"""
Create a directory at the specified path.
Creates parent directories if they don't exist and doesn't raise an error
if the directory already exists.
Args:
path: The directory path to create
Returns:
Dict containing 'success' boolean and optionally 'error' string
"""
try:
resolve_path(path).mkdir(parents=True, exist_ok=True)
return {"success": True}
@@ -95,6 +208,15 @@ class GenericFileHandler(BaseFileHandler):
return {"success": False, "error": str(e)}
async def delete_dir(self, path: str) -> Dict[str, Any]:
"""
Delete an empty directory at the specified path.
Args:
path: The directory path to delete
Returns:
Dict containing 'success' boolean and optionally 'error' string
"""
try:
resolve_path(path).rmdir()
return {"success": True}

View File

@@ -38,7 +38,12 @@ class LinuxAccessibilityHandler(BaseAccessibilityHandler):
"""Linux implementation of accessibility handler."""
async def get_accessibility_tree(self) -> Dict[str, Any]:
"""Get the accessibility tree of the current window."""
"""Get the accessibility tree of the current window.
Returns:
Dict[str, Any]: A dictionary containing success status and a simulated tree structure
since Linux doesn't have equivalent accessibility API like macOS.
"""
# Linux doesn't have equivalent accessibility API like macOS
# Return a minimal dummy tree
logger.info("Getting accessibility tree (simulated, no accessibility API available on Linux)")
@@ -56,7 +61,16 @@ class LinuxAccessibilityHandler(BaseAccessibilityHandler):
async def find_element(self, role: Optional[str] = None,
title: Optional[str] = None,
value: Optional[str] = None) -> Dict[str, Any]:
"""Find an element in the accessibility tree by criteria."""
"""Find an element in the accessibility tree by criteria.
Args:
role: The role of the element to find.
title: The title of the element to find.
value: The value of the element to find.
Returns:
Dict[str, Any]: A dictionary indicating that element search is not supported on Linux.
"""
logger.info(f"Finding element with role={role}, title={title}, value={value} (not supported on Linux)")
return {
"success": False,
@@ -64,7 +78,12 @@ class LinuxAccessibilityHandler(BaseAccessibilityHandler):
}
def get_cursor_position(self) -> Tuple[int, int]:
"""Get the current cursor position."""
"""Get the current cursor position.
Returns:
Tuple[int, int]: The x and y coordinates of the cursor position.
Returns (0, 0) if pyautogui is not available.
"""
try:
pos = pyautogui.position()
return pos.x, pos.y
@@ -75,7 +94,12 @@ class LinuxAccessibilityHandler(BaseAccessibilityHandler):
return 0, 0
def get_screen_size(self) -> Tuple[int, int]:
"""Get the screen size."""
"""Get the screen size.
Returns:
Tuple[int, int]: The width and height of the screen in pixels.
Returns (1920, 1080) if pyautogui is not available.
"""
try:
size = pyautogui.size()
return size.width, size.height
@@ -92,6 +116,16 @@ class LinuxAutomationHandler(BaseAutomationHandler):
# Mouse Actions
async def mouse_down(self, x: Optional[int] = None, y: Optional[int] = None, button: str = "left") -> Dict[str, Any]:
"""Press and hold a mouse button at the specified coordinates.
Args:
x: The x coordinate to move to before pressing. If None, uses current position.
y: The y coordinate to move to before pressing. If None, uses current position.
button: The mouse button to press ("left", "right", or "middle").
Returns:
Dict[str, Any]: A dictionary with success status and error message if failed.
"""
try:
if x is not None and y is not None:
pyautogui.moveTo(x, y)
@@ -101,6 +135,16 @@ class LinuxAutomationHandler(BaseAutomationHandler):
return {"success": False, "error": str(e)}
async def mouse_up(self, x: Optional[int] = None, y: Optional[int] = None, button: str = "left") -> Dict[str, Any]:
"""Release a mouse button at the specified coordinates.
Args:
x: The x coordinate to move to before releasing. If None, uses current position.
y: The y coordinate to move to before releasing. If None, uses current position.
button: The mouse button to release ("left", "right", or "middle").
Returns:
Dict[str, Any]: A dictionary with success status and error message if failed.
"""
try:
if x is not None and y is not None:
pyautogui.moveTo(x, y)
@@ -110,6 +154,15 @@ class LinuxAutomationHandler(BaseAutomationHandler):
return {"success": False, "error": str(e)}
async def move_cursor(self, x: int, y: int) -> Dict[str, Any]:
"""Move the cursor to the specified coordinates.
Args:
x: The x coordinate to move to.
y: The y coordinate to move to.
Returns:
Dict[str, Any]: A dictionary with success status and error message if failed.
"""
try:
pyautogui.moveTo(x, y)
return {"success": True}
@@ -117,6 +170,15 @@ class LinuxAutomationHandler(BaseAutomationHandler):
return {"success": False, "error": str(e)}
async def left_click(self, x: Optional[int] = None, y: Optional[int] = None) -> Dict[str, Any]:
"""Perform a left mouse click at the specified coordinates.
Args:
x: The x coordinate to click at. If None, clicks at current position.
y: The y coordinate to click at. If None, clicks at current position.
Returns:
Dict[str, Any]: A dictionary with success status and error message if failed.
"""
try:
if x is not None and y is not None:
pyautogui.moveTo(x, y)
@@ -126,6 +188,15 @@ class LinuxAutomationHandler(BaseAutomationHandler):
return {"success": False, "error": str(e)}
async def right_click(self, x: Optional[int] = None, y: Optional[int] = None) -> Dict[str, Any]:
"""Perform a right mouse click at the specified coordinates.
Args:
x: The x coordinate to click at. If None, clicks at current position.
y: The y coordinate to click at. If None, clicks at current position.
Returns:
Dict[str, Any]: A dictionary with success status and error message if failed.
"""
try:
if x is not None and y is not None:
pyautogui.moveTo(x, y)
@@ -135,6 +206,15 @@ class LinuxAutomationHandler(BaseAutomationHandler):
return {"success": False, "error": str(e)}
async def double_click(self, x: Optional[int] = None, y: Optional[int] = None) -> Dict[str, Any]:
"""Perform a double click at the specified coordinates.
Args:
x: The x coordinate to double click at. If None, clicks at current position.
y: The y coordinate to double click at. If None, clicks at current position.
Returns:
Dict[str, Any]: A dictionary with success status and error message if failed.
"""
try:
if x is not None and y is not None:
pyautogui.moveTo(x, y)
@@ -144,6 +224,16 @@ class LinuxAutomationHandler(BaseAutomationHandler):
return {"success": False, "error": str(e)}
async def click(self, x: Optional[int] = None, y: Optional[int] = None, button: str = "left") -> Dict[str, Any]:
"""Perform a mouse click with the specified button at the given coordinates.
Args:
x: The x coordinate to click at. If None, clicks at current position.
y: The y coordinate to click at. If None, clicks at current position.
button: The mouse button to click ("left", "right", or "middle").
Returns:
Dict[str, Any]: A dictionary with success status and error message if failed.
"""
try:
if x is not None and y is not None:
pyautogui.moveTo(x, y)
@@ -153,6 +243,17 @@ class LinuxAutomationHandler(BaseAutomationHandler):
return {"success": False, "error": str(e)}
async def drag_to(self, x: int, y: int, button: str = "left", duration: float = 0.5) -> Dict[str, Any]:
"""Drag from the current position to the specified coordinates.
Args:
x: The x coordinate to drag to.
y: The y coordinate to drag to.
button: The mouse button to use for dragging.
duration: The time in seconds to take for the drag operation.
Returns:
Dict[str, Any]: A dictionary with success status and error message if failed.
"""
try:
pyautogui.dragTo(x, y, duration=duration, button=button)
return {"success": True}
@@ -160,6 +261,18 @@ class LinuxAutomationHandler(BaseAutomationHandler):
return {"success": False, "error": str(e)}
async def drag(self, start_x: int, start_y: int, end_x: int, end_y: int, button: str = "left") -> Dict[str, Any]:
"""Drag from start coordinates to end coordinates.
Args:
start_x: The starting x coordinate.
start_y: The starting y coordinate.
end_x: The ending x coordinate.
end_y: The ending y coordinate.
button: The mouse button to use for dragging.
Returns:
Dict[str, Any]: A dictionary with success status and error message if failed.
"""
try:
pyautogui.moveTo(start_x, start_y)
pyautogui.dragTo(end_x, end_y, duration=0.5, button=button)
@@ -168,6 +281,16 @@ class LinuxAutomationHandler(BaseAutomationHandler):
return {"success": False, "error": str(e)}
async def drag_path(self, path: List[Tuple[int, int]], button: str = "left", duration: float = 0.5) -> Dict[str, Any]:
"""Drag along a path defined by a list of coordinates.
Args:
path: A list of (x, y) coordinate tuples defining the drag path.
button: The mouse button to use for dragging.
duration: The time in seconds to take for each segment of the drag.
Returns:
Dict[str, Any]: A dictionary with success status and error message if failed.
"""
try:
if not path:
return {"success": False, "error": "Path is empty"}
@@ -180,6 +303,14 @@ class LinuxAutomationHandler(BaseAutomationHandler):
# Keyboard Actions
async def key_down(self, key: str) -> Dict[str, Any]:
"""Press and hold a key.
Args:
key: The key to press down.
Returns:
Dict[str, Any]: A dictionary with success status and error message if failed.
"""
try:
pyautogui.keyDown(key)
return {"success": True}
@@ -187,6 +318,14 @@ class LinuxAutomationHandler(BaseAutomationHandler):
return {"success": False, "error": str(e)}
async def key_up(self, key: str) -> Dict[str, Any]:
"""Release a key.
Args:
key: The key to release.
Returns:
Dict[str, Any]: A dictionary with success status and error message if failed.
"""
try:
pyautogui.keyUp(key)
return {"success": True}
@@ -194,6 +333,14 @@ class LinuxAutomationHandler(BaseAutomationHandler):
return {"success": False, "error": str(e)}
async def type_text(self, text: str) -> Dict[str, Any]:
"""Type the specified text using the keyboard.
Args:
text: The text to type.
Returns:
Dict[str, Any]: A dictionary with success status and error message if failed.
"""
try:
# use pynput for Unicode support
self.keyboard.type(text)
@@ -202,6 +349,14 @@ class LinuxAutomationHandler(BaseAutomationHandler):
return {"success": False, "error": str(e)}
async def press_key(self, key: str) -> Dict[str, Any]:
"""Press and release a key.
Args:
key: The key to press.
Returns:
Dict[str, Any]: A dictionary with success status and error message if failed.
"""
try:
pyautogui.press(key)
return {"success": True}
@@ -209,6 +364,14 @@ class LinuxAutomationHandler(BaseAutomationHandler):
return {"success": False, "error": str(e)}
async def hotkey(self, keys: List[str]) -> Dict[str, Any]:
"""Press a combination of keys simultaneously.
Args:
keys: A list of keys to press together as a hotkey combination.
Returns:
Dict[str, Any]: A dictionary with success status and error message if failed.
"""
try:
pyautogui.hotkey(*keys)
return {"success": True}
@@ -217,6 +380,15 @@ class LinuxAutomationHandler(BaseAutomationHandler):
# Scrolling Actions
async def scroll(self, x: int, y: int) -> Dict[str, Any]:
"""Scroll the mouse wheel.
Args:
x: The horizontal scroll amount.
y: The vertical scroll amount.
Returns:
Dict[str, Any]: A dictionary with success status and error message if failed.
"""
try:
self.mouse.scroll(x, y)
return {"success": True}
@@ -224,6 +396,14 @@ class LinuxAutomationHandler(BaseAutomationHandler):
return {"success": False, "error": str(e)}
async def scroll_down(self, clicks: int = 1) -> Dict[str, Any]:
"""Scroll down by the specified number of clicks.
Args:
clicks: The number of scroll clicks to perform downward.
Returns:
Dict[str, Any]: A dictionary with success status and error message if failed.
"""
try:
pyautogui.scroll(-clicks)
return {"success": True}
@@ -231,6 +411,14 @@ class LinuxAutomationHandler(BaseAutomationHandler):
return {"success": False, "error": str(e)}
async def scroll_up(self, clicks: int = 1) -> Dict[str, Any]:
"""Scroll up by the specified number of clicks.
Args:
clicks: The number of scroll clicks to perform upward.
Returns:
Dict[str, Any]: A dictionary with success status and error message if failed.
"""
try:
pyautogui.scroll(clicks)
return {"success": True}
@@ -239,6 +427,12 @@ class LinuxAutomationHandler(BaseAutomationHandler):
# Screen Actions
async def screenshot(self) -> Dict[str, Any]:
"""Take a screenshot of the current screen.
Returns:
Dict[str, Any]: A dictionary containing success status and base64-encoded image data,
or error message if failed.
"""
try:
from PIL import Image
screenshot = pyautogui.screenshot()
@@ -253,6 +447,12 @@ class LinuxAutomationHandler(BaseAutomationHandler):
return {"success": False, "error": f"Screenshot error: {str(e)}"}
async def get_screen_size(self) -> Dict[str, Any]:
"""Get the size of the screen.
Returns:
Dict[str, Any]: A dictionary containing success status and screen dimensions,
or error message if failed.
"""
try:
size = pyautogui.size()
return {"success": True, "size": {"width": size.width, "height": size.height}}
@@ -260,6 +460,12 @@ class LinuxAutomationHandler(BaseAutomationHandler):
return {"success": False, "error": str(e)}
async def get_cursor_position(self) -> Dict[str, Any]:
"""Get the current position of the cursor.
Returns:
Dict[str, Any]: A dictionary containing success status and cursor coordinates,
or error message if failed.
"""
try:
pos = pyautogui.position()
return {"success": True, "position": {"x": pos.x, "y": pos.y}}
@@ -268,6 +474,12 @@ class LinuxAutomationHandler(BaseAutomationHandler):
# Clipboard Actions
async def copy_to_clipboard(self) -> Dict[str, Any]:
"""Get the current content of the clipboard.
Returns:
Dict[str, Any]: A dictionary containing success status and clipboard content,
or error message if failed.
"""
try:
import pyperclip
content = pyperclip.paste()
@@ -276,6 +488,14 @@ class LinuxAutomationHandler(BaseAutomationHandler):
return {"success": False, "error": str(e)}
async def set_clipboard(self, text: str) -> Dict[str, Any]:
"""Set the clipboard content to the specified text.
Args:
text: The text to copy to the clipboard.
Returns:
Dict[str, Any]: A dictionary with success status and error message if failed.
"""
try:
import pyperclip
pyperclip.copy(text)
@@ -285,6 +505,15 @@ class LinuxAutomationHandler(BaseAutomationHandler):
# Command Execution
async def run_command(self, command: str) -> Dict[str, Any]:
"""Execute a shell command asynchronously.
Args:
command: The shell command to execute.
Returns:
Dict[str, Any]: A dictionary containing success status, stdout, stderr,
and return code, or error message if failed.
"""
try:
# Create subprocess
process = await asyncio.create_subprocess_shell(

View File

@@ -3,6 +3,12 @@ import re
from pydantic import BaseModel, Field, computed_field, validator, ConfigDict, RootModel
class DiskInfo(BaseModel):
"""Information about disk storage allocation.
Attributes:
total: Total disk space in bytes
allocated: Currently allocated disk space in bytes
"""
total: int
allocated: int
@@ -10,6 +16,15 @@ class VMConfig(BaseModel):
"""Configuration for creating a new VM.
Note: Memory and disk sizes should be specified with units (e.g., "4GB", "64GB")
Attributes:
name: Name of the virtual machine
os: Operating system type, either "macOS" or "linux"
cpu: Number of CPU cores to allocate
memory: Amount of memory to allocate with units
disk_size: Size of the disk to create with units
display: Display resolution in format "widthxheight"
ipsw: IPSW path or 'latest' for macOS VMs, None for other OS types
"""
name: str
os: Literal["macOS", "linux"] = "macOS"
@@ -23,7 +38,12 @@ class VMConfig(BaseModel):
populate_by_alias = True
class SharedDirectory(BaseModel):
"""Configuration for a shared directory."""
"""Configuration for a shared directory.
Attributes:
host_path: Path to the directory on the host system
read_only: Whether the directory should be mounted as read-only
"""
host_path: str = Field(..., alias="hostPath") # Allow host_path but serialize as hostPath
read_only: bool = False
@@ -50,6 +70,16 @@ class VMRunOpts(BaseModel):
)
def model_dump(self, **kwargs):
"""Export model data with proper field name conversion.
Converts shared directory fields to match API expectations when using aliases.
Args:
**kwargs: Keyword arguments passed to parent model_dump method
Returns:
dict: Model data with properly formatted field names
"""
data = super().model_dump(**kwargs)
# Convert shared directory fields to match API expectations
if self.shared_directories and "by_alias" in kwargs and kwargs["by_alias"]:
@@ -65,6 +95,18 @@ class VMRunOpts(BaseModel):
return data
class VMStatus(BaseModel):
"""Status information for a virtual machine.
Attributes:
name: Name of the virtual machine
status: Current status of the VM
os: Operating system type
cpu_count: Number of CPU cores allocated
memory_size: Amount of memory allocated in bytes
disk_size: Disk storage information
vnc_url: URL for VNC connection if available
ip_address: IP address of the VM if available
"""
name: str
status: str
os: Literal["macOS", "linux"]
@@ -80,38 +122,79 @@ class VMStatus(BaseModel):
@computed_field
@property
def state(self) -> str:
"""Get the current state of the VM.
Returns:
str: Current VM status
"""
return self.status
@computed_field
@property
def cpu(self) -> int:
"""Get the number of CPU cores.
Returns:
int: Number of CPU cores allocated to the VM
"""
return self.cpu_count
@computed_field
@property
def memory(self) -> str:
"""Get memory allocation in human-readable format.
Returns:
str: Memory size formatted as "{size}GB"
"""
# Convert bytes to GB
gb = self.memory_size / (1024 * 1024 * 1024)
return f"{int(gb)}GB"
class VMUpdateOpts(BaseModel):
"""Options for updating VM configuration.
Attributes:
cpu: Number of CPU cores to update to
memory: Amount of memory to update to with units
disk_size: Size of disk to update to with units
"""
cpu: Optional[int] = None
memory: Optional[str] = None
disk_size: Optional[str] = None
class ImageRef(BaseModel):
"""Reference to a VM image."""
"""Reference to a VM image.
Attributes:
image: Name of the image
tag: Tag version of the image
registry: Registry hostname where image is stored
organization: Organization or namespace in the registry
"""
image: str
tag: str = "latest"
registry: Optional[str] = "ghcr.io"
organization: Optional[str] = "trycua"
def model_dump(self, **kwargs):
"""Override model_dump to return just the image:tag format."""
"""Override model_dump to return just the image:tag format.
Args:
**kwargs: Keyword arguments (ignored)
Returns:
str: Image reference in "image:tag" format
"""
return f"{self.image}:{self.tag}"
class CloneSpec(BaseModel):
"""Specification for cloning a VM."""
"""Specification for cloning a VM.
Attributes:
name: Name of the source VM to clone
new_name: Name for the new cloned VM
"""
name: str
new_name: str = Field(alias="newName")
@@ -119,18 +202,44 @@ class CloneSpec(BaseModel):
populate_by_alias = True
class ImageInfo(BaseModel):
"""Model for individual image information."""
"""Model for individual image information.
Attributes:
imageId: Unique identifier for the image
"""
imageId: str
class ImageList(RootModel):
"""Response model for the images endpoint."""
"""Response model for the images endpoint.
A list-like container for ImageInfo objects that provides
iteration and indexing capabilities.
"""
root: List[ImageInfo]
def __iter__(self):
"""Iterate over the image list.
Returns:
Iterator over ImageInfo objects
"""
return iter(self.root)
def __getitem__(self, item):
"""Get an item from the image list by index.
Args:
item: Index or slice to retrieve
Returns:
ImageInfo or list of ImageInfo objects
"""
return self.root[item]
def __len__(self):
return len(self.root)
"""Get the number of images in the list.
Returns:
int: Number of images in the list
"""
return len(self.root)

View File

@@ -8,6 +8,13 @@ import type { AccessibilityNode, CursorPosition, MouseButton } from './base';
export class MacOSComputerInterface extends BaseComputerInterface {
// Mouse Actions
/**
* Press and hold a mouse button at the specified coordinates.
* @param {number} [x] - X coordinate for the mouse action
* @param {number} [y] - Y coordinate for the mouse action
* @param {MouseButton} [button='left'] - Mouse button to press down
* @returns {Promise<void>}
*/
async mouseDown(
x?: number,
y?: number,
@@ -16,6 +23,13 @@ export class MacOSComputerInterface extends BaseComputerInterface {
await this.sendCommand('mouse_down', { x, y, button });
}
/**
* Release a mouse button at the specified coordinates.
* @param {number} [x] - X coordinate for the mouse action
* @param {number} [y] - Y coordinate for the mouse action
* @param {MouseButton} [button='left'] - Mouse button to release
* @returns {Promise<void>}
*/
async mouseUp(
x?: number,
y?: number,
@@ -24,22 +38,54 @@ export class MacOSComputerInterface extends BaseComputerInterface {
await this.sendCommand('mouse_up', { x, y, button });
}
/**
* Perform a left mouse click at the specified coordinates.
* @param {number} [x] - X coordinate for the click
* @param {number} [y] - Y coordinate for the click
* @returns {Promise<void>}
*/
async leftClick(x?: number, y?: number): Promise<void> {
await this.sendCommand('left_click', { x, y });
}
/**
* Perform a right mouse click at the specified coordinates.
* @param {number} [x] - X coordinate for the click
* @param {number} [y] - Y coordinate for the click
* @returns {Promise<void>}
*/
async rightClick(x?: number, y?: number): Promise<void> {
await this.sendCommand('right_click', { x, y });
}
/**
* Perform a double click at the specified coordinates.
* @param {number} [x] - X coordinate for the double click
* @param {number} [y] - Y coordinate for the double click
* @returns {Promise<void>}
*/
async doubleClick(x?: number, y?: number): Promise<void> {
await this.sendCommand('double_click', { x, y });
}
/**
* Move the cursor to the specified coordinates.
* @param {number} x - X coordinate to move to
* @param {number} y - Y coordinate to move to
* @returns {Promise<void>}
*/
async moveCursor(x: number, y: number): Promise<void> {
await this.sendCommand('move_cursor', { x, y });
}
/**
* Drag from current position to the specified coordinates.
* @param {number} x - X coordinate to drag to
* @param {number} y - Y coordinate to drag to
* @param {MouseButton} [button='left'] - Mouse button to use for dragging
* @param {number} [duration=0.5] - Duration of the drag operation in seconds
* @returns {Promise<void>}
*/
async dragTo(
x: number,
y: number,
@@ -49,6 +95,13 @@ export class MacOSComputerInterface extends BaseComputerInterface {
await this.sendCommand('drag_to', { x, y, button, duration });
}
/**
* Drag along a path of coordinates.
* @param {Array<[number, number]>} path - Array of [x, y] coordinate pairs to drag through
* @param {MouseButton} [button='left'] - Mouse button to use for dragging
* @param {number} [duration=0.5] - Duration of the drag operation in seconds
* @returns {Promise<void>}
*/
async drag(
path: Array<[number, number]>,
button: MouseButton = 'left',
@@ -58,40 +111,86 @@ export class MacOSComputerInterface extends BaseComputerInterface {
}
// Keyboard Actions
/**
* Press and hold a key.
* @param {string} key - Key to press down
* @returns {Promise<void>}
*/
async keyDown(key: string): Promise<void> {
await this.sendCommand('key_down', { key });
}
/**
* Release a key.
* @param {string} key - Key to release
* @returns {Promise<void>}
*/
async keyUp(key: string): Promise<void> {
await this.sendCommand('key_up', { key });
}
/**
* Type text as if entered from keyboard.
* @param {string} text - Text to type
* @returns {Promise<void>}
*/
async typeText(text: string): Promise<void> {
await this.sendCommand('type_text', { text });
}
/**
* Press and release a key.
* @param {string} key - Key to press
* @returns {Promise<void>}
*/
async pressKey(key: string): Promise<void> {
await this.sendCommand('press_key', { key });
}
/**
* Press multiple keys simultaneously as a hotkey combination.
* @param {...string} keys - Keys to press together
* @returns {Promise<void>}
*/
async hotkey(...keys: string[]): Promise<void> {
await this.sendCommand('hotkey', { keys });
}
// Scrolling Actions
/**
* Scroll by the specified amount in x and y directions.
* @param {number} x - Horizontal scroll amount
* @param {number} y - Vertical scroll amount
* @returns {Promise<void>}
*/
async scroll(x: number, y: number): Promise<void> {
await this.sendCommand('scroll', { x, y });
}
/**
* Scroll down by the specified number of clicks.
* @param {number} [clicks=1] - Number of scroll clicks
* @returns {Promise<void>}
*/
async scrollDown(clicks = 1): Promise<void> {
await this.sendCommand('scroll_down', { clicks });
}
/**
* Scroll up by the specified number of clicks.
* @param {number} [clicks=1] - Number of scroll clicks
* @returns {Promise<void>}
*/
async scrollUp(clicks = 1): Promise<void> {
await this.sendCommand('scroll_up', { clicks });
}
// Screen Actions
/**
* Take a screenshot of the screen.
* @returns {Promise<Buffer>} Screenshot image data as a Buffer
* @throws {Error} If screenshot fails
*/
async screenshot(): Promise<Buffer> {
const response = await this.sendCommand('screenshot');
if (!response.image_data) {
@@ -100,6 +199,11 @@ export class MacOSComputerInterface extends BaseComputerInterface {
return Buffer.from(response.image_data as string, 'base64');
}
/**
* Get the current screen size.
* @returns {Promise<ScreenSize>} Screen dimensions
* @throws {Error} If unable to get screen size
*/
async getScreenSize(): Promise<ScreenSize> {
const response = await this.sendCommand('get_screen_size');
if (!response.success || !response.size) {
@@ -108,6 +212,11 @@ export class MacOSComputerInterface extends BaseComputerInterface {
return response.size as ScreenSize;
}
/**
* Get the current cursor position.
* @returns {Promise<CursorPosition>} Current cursor coordinates
* @throws {Error} If unable to get cursor position
*/
async getCursorPosition(): Promise<CursorPosition> {
const response = await this.sendCommand('get_cursor_position');
if (!response.success || !response.position) {
@@ -117,6 +226,11 @@ export class MacOSComputerInterface extends BaseComputerInterface {
}
// Clipboard Actions
/**
* Copy current selection to clipboard and return the content.
* @returns {Promise<string>} Clipboard content
* @throws {Error} If unable to get clipboard content
*/
async copyToClipboard(): Promise<string> {
const response = await this.sendCommand('copy_to_clipboard');
if (!response.success || !response.content) {
@@ -125,21 +239,42 @@ export class MacOSComputerInterface extends BaseComputerInterface {
return response.content as string;
}
/**
* Set the clipboard content to the specified text.
* @param {string} text - Text to set in clipboard
* @returns {Promise<void>}
*/
async setClipboard(text: string): Promise<void> {
await this.sendCommand('set_clipboard', { text });
}
// File System Actions
/**
* Check if a file exists at the specified path.
* @param {string} path - Path to the file
* @returns {Promise<boolean>} True if file exists, false otherwise
*/
async fileExists(path: string): Promise<boolean> {
const response = await this.sendCommand('file_exists', { path });
return (response.exists as boolean) || false;
}
/**
* Check if a directory exists at the specified path.
* @param {string} path - Path to the directory
* @returns {Promise<boolean>} True if directory exists, false otherwise
*/
async directoryExists(path: string): Promise<boolean> {
const response = await this.sendCommand('directory_exists', { path });
return (response.exists as boolean) || false;
}
/**
* List the contents of a directory.
* @param {string} path - Path to the directory
* @returns {Promise<string[]>} Array of file and directory names
* @throws {Error} If unable to list directory
*/
async listDir(path: string): Promise<string[]> {
const response = await this.sendCommand('list_dir', { path });
if (!response.success) {
@@ -148,6 +283,12 @@ export class MacOSComputerInterface extends BaseComputerInterface {
return (response.files as string[]) || [];
}
/**
* Get the size of a file in bytes.
* @param {string} path - Path to the file
* @returns {Promise<number>} File size in bytes
* @throws {Error} If unable to get file size
*/
async getFileSize(path: string): Promise<number> {
const response = await this.sendCommand('get_file_size', { path });
if (!response.success) {
@@ -156,6 +297,16 @@ export class MacOSComputerInterface extends BaseComputerInterface {
return (response.size as number) || 0;
}
/**
* Read file content in chunks for large files.
* @private
* @param {string} path - Path to the file
* @param {number} offset - Starting byte offset
* @param {number} totalLength - Total number of bytes to read
* @param {number} [chunkSize=1048576] - Size of each chunk in bytes
* @returns {Promise<Buffer>} File content as Buffer
* @throws {Error} If unable to read file chunk
*/
private async readBytesChunked(
path: string,
offset: number,
@@ -190,6 +341,16 @@ export class MacOSComputerInterface extends BaseComputerInterface {
return Buffer.concat(chunks);
}
/**
* Write file content in chunks for large files.
* @private
* @param {string} path - Path to the file
* @param {Buffer} content - Content to write
* @param {boolean} [append=false] - Whether to append to existing file
* @param {number} [chunkSize=1048576] - Size of each chunk in bytes
* @returns {Promise<void>}
* @throws {Error} If unable to write file chunk
*/
private async writeBytesChunked(
path: string,
content: Buffer,
@@ -222,36 +383,43 @@ export class MacOSComputerInterface extends BaseComputerInterface {
}
}
/**
* Read text from a file with specified encoding.
* @param {string} path - Path to the file to read
* @param {BufferEncoding} [encoding='utf8'] - Text encoding to use
* @returns {Promise<string>} The decoded text content of the file
*/
async readText(path: string, encoding: BufferEncoding = 'utf8'): Promise<string> {
/**
* Read text from a file with specified encoding.
*
* @param path - Path to the file to read
* @param encoding - Text encoding to use (default: 'utf8')
* @returns The decoded text content of the file
*/
const contentBytes = await this.readBytes(path);
return contentBytes.toString(encoding);
}
/**
* Write text to a file with specified encoding.
* @param {string} path - Path to the file to write
* @param {string} content - Text content to write
* @param {BufferEncoding} [encoding='utf8'] - Text encoding to use
* @param {boolean} [append=false] - Whether to append to the file instead of overwriting
* @returns {Promise<void>}
*/
async writeText(
path: string,
content: string,
encoding: BufferEncoding = 'utf8',
append: boolean = false
): Promise<void> {
/**
* Write text to a file with specified encoding.
*
* @param path - Path to the file to write
* @param content - Text content to write
* @param encoding - Text encoding to use (default: 'utf8')
* @param append - Whether to append to the file instead of overwriting
*/
const contentBytes = Buffer.from(content, encoding);
await this.writeBytes(path, contentBytes, append);
}
/**
* Read bytes from a file, with optional offset and length.
* @param {string} path - Path to the file
* @param {number} [offset=0] - Starting byte offset
* @param {number} [length] - Number of bytes to read (reads entire file if not specified)
* @returns {Promise<Buffer>} File content as Buffer
* @throws {Error} If unable to read file
*/
async readBytes(path: string, offset: number = 0, length?: number): Promise<Buffer> {
// For large files, use chunked reading
if (length === undefined) {
@@ -275,6 +443,14 @@ export class MacOSComputerInterface extends BaseComputerInterface {
return Buffer.from(response.content_b64 as string, 'base64');
}
/**
* Write bytes to a file.
* @param {string} path - Path to the file
* @param {Buffer} content - Content to write as Buffer
* @param {boolean} [append=false] - Whether to append to existing file
* @returns {Promise<void>}
* @throws {Error} If unable to write file
*/
async writeBytes(path: string, content: Buffer, append: boolean = false): Promise<void> {
// For large files, use chunked writing
if (content.length > 5 * 1024 * 1024) {
@@ -293,6 +469,12 @@ export class MacOSComputerInterface extends BaseComputerInterface {
}
}
/**
* Delete a file at the specified path.
* @param {string} path - Path to the file to delete
* @returns {Promise<void>}
* @throws {Error} If unable to delete file
*/
async deleteFile(path: string): Promise<void> {
const response = await this.sendCommand('delete_file', { path });
if (!response.success) {
@@ -300,6 +482,12 @@ export class MacOSComputerInterface extends BaseComputerInterface {
}
}
/**
* Create a directory at the specified path.
* @param {string} path - Path where to create the directory
* @returns {Promise<void>}
* @throws {Error} If unable to create directory
*/
async createDir(path: string): Promise<void> {
const response = await this.sendCommand('create_dir', { path });
if (!response.success) {
@@ -309,6 +497,12 @@ export class MacOSComputerInterface extends BaseComputerInterface {
}
}
/**
* Delete a directory at the specified path.
* @param {string} path - Path to the directory to delete
* @returns {Promise<void>}
* @throws {Error} If unable to delete directory
*/
async deleteDir(path: string): Promise<void> {
const response = await this.sendCommand('delete_dir', { path });
if (!response.success) {
@@ -318,6 +512,12 @@ export class MacOSComputerInterface extends BaseComputerInterface {
}
}
/**
* Execute a shell command and return stdout and stderr.
* @param {string} command - Command to execute
* @returns {Promise<[string, string]>} Tuple of [stdout, stderr]
* @throws {Error} If command execution fails
*/
async runCommand(command: string): Promise<[string, string]> {
const response = await this.sendCommand('run_command', { command });
if (!response.success) {
@@ -330,6 +530,11 @@ export class MacOSComputerInterface extends BaseComputerInterface {
}
// Accessibility Actions
/**
* Get the accessibility tree of the current screen.
* @returns {Promise<AccessibilityNode>} Root accessibility node
* @throws {Error} If unable to get accessibility tree
*/
async getAccessibilityTree(): Promise<AccessibilityNode> {
const response = await this.sendCommand('get_accessibility_tree');
if (!response.success) {
@@ -340,6 +545,13 @@ export class MacOSComputerInterface extends BaseComputerInterface {
return response as unknown as AccessibilityNode;
}
/**
* Convert coordinates to screen coordinates.
* @param {number} x - X coordinate to convert
* @param {number} y - Y coordinate to convert
* @returns {Promise<[number, number]>} Converted screen coordinates as [x, y]
* @throws {Error} If coordinate conversion fails
*/
async toScreenCoordinates(x: number, y: number): Promise<[number, number]> {
const response = await this.sendCommand('to_screen_coordinates', { x, y });
if (!response.success || !response.coordinates) {
@@ -348,6 +560,13 @@ export class MacOSComputerInterface extends BaseComputerInterface {
return response.coordinates as [number, number];
}
/**
* Convert coordinates to screenshot coordinates.
* @param {number} x - X coordinate to convert
* @param {number} y - Y coordinate to convert
* @returns {Promise<[number, number]>} Converted screenshot coordinates as [x, y]
* @throws {Error} If coordinate conversion fails
*/
async toScreenshotCoordinates(
x: number,
y: number