Merge branch 'main' into models/opencua

This commit is contained in:
Dillon DuPont
2025-09-15 15:11:15 -04:00
35 changed files with 9754 additions and 137 deletions

View File

@@ -31,7 +31,8 @@ from .callbacks import (
TrajectorySaverCallback,
BudgetManagerCallback,
TelemetryCallback,
OperatorNormalizerCallback
OperatorNormalizerCallback,
PromptInstructionsCallback,
)
from .computers import (
AsyncComputerHandler,
@@ -162,6 +163,7 @@ class ComputerAgent:
custom_loop: Optional[Callable] = None,
only_n_most_recent_images: Optional[int] = None,
callbacks: Optional[List[Any]] = None,
instructions: Optional[str] = None,
verbosity: Optional[int] = None,
trajectory_dir: Optional[str | Path | dict] = None,
max_retries: Optional[int] = 3,
@@ -181,6 +183,7 @@ class ComputerAgent:
custom_loop: Custom agent loop function to use instead of auto-selection
only_n_most_recent_images: If set, only keep the N most recent images in message history. Adds ImageRetentionCallback automatically.
callbacks: List of AsyncCallbackHandler instances for preprocessing/postprocessing
instructions: Optional system instructions to be passed to the model
verbosity: Logging level (logging.DEBUG, logging.INFO, etc.). If set, adds LoggingCallback automatically
trajectory_dir: If set, saves trajectory data (screenshots, responses) to this directory. Adds TrajectorySaverCallback automatically.
max_retries: Maximum number of retries for failed API calls
@@ -200,6 +203,7 @@ class ComputerAgent:
self.custom_loop = custom_loop
self.only_n_most_recent_images = only_n_most_recent_images
self.callbacks = callbacks or []
self.instructions = instructions
self.verbosity = verbosity
self.trajectory_dir = trajectory_dir
self.max_retries = max_retries
@@ -214,6 +218,10 @@ class ComputerAgent:
# Prepend operator normalizer callback
self.callbacks.insert(0, OperatorNormalizerCallback())
# Add prompt instructions callback if provided
if self.instructions:
self.callbacks.append(PromptInstructionsCallback(self.instructions))
# Add telemetry callback if telemetry_enabled is set
if self.telemetry_enabled:
if isinstance(self.telemetry_enabled, bool):

View File

@@ -9,6 +9,7 @@ from .trajectory_saver import TrajectorySaverCallback
from .budget_manager import BudgetManagerCallback
from .telemetry import TelemetryCallback
from .operator_validator import OperatorNormalizerCallback
from .prompt_instructions import PromptInstructionsCallback
__all__ = [
"AsyncCallbackHandler",
@@ -18,4 +19,5 @@ __all__ = [
"BudgetManagerCallback",
"TelemetryCallback",
"OperatorNormalizerCallback",
"PromptInstructionsCallback",
]

View File

@@ -0,0 +1,47 @@
"""
Prompt instructions callback.
This callback allows simple prompt engineering by pre-pending a user
instructions message to the start of the conversation before each LLM call.
Usage:
from agent.callbacks import PromptInstructionsCallback
agent = ComputerAgent(
model="openai/computer-use-preview",
callbacks=[PromptInstructionsCallback("Follow these rules...")]
)
"""
from typing import Any, Dict, List, Optional
from .base import AsyncCallbackHandler
class PromptInstructionsCallback(AsyncCallbackHandler):
"""
Prepend a user instructions message to the message list.
This is a minimal, non-invasive way to guide the agent's behavior without
modifying agent loops or tools. It works with any provider/loop since it
only alters the messages array before sending to the model.
"""
def __init__(self, instructions: Optional[str]) -> None:
self.instructions = instructions
async def on_llm_start(self, messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
# Pre-pend instructions message
if not self.instructions:
return messages
# Ensure we don't duplicate if already present at the front
if messages and isinstance(messages[0], dict):
first = messages[0]
if first.get("role") == "user" and first.get("content") == self.instructions:
return messages
return [
{"role": "user", "content": self.instructions},
] + messages

View File

@@ -1,102 +1,28 @@
"""HUD integration: Generic HuggingFace dataset evaluation runner (CUA proxy).
"""HUD integration: dataset runners and MCP-based computer agent export.
This module exposes two helpers to evaluate HUD-compatible datasets using
HUD's OperatorAgent, while proxying model calls through our ComputerAgent via
`FakeAsyncOpenAI` (see `agent/integrations/hud/agent.py`).
This module exposes helpers to evaluate HUD-compatible datasets and exports
the MCP-compatible computer agent implementation.
Exports:
- run_single_task(dataset_name, *, agent_type="cua-proxy", model=None, allowed_tools=None)
- run_full_dataset(dataset_name, *, agent_type="cua-proxy", model=None, allowed_tools=None, max_concurrent=30, max_steps=50)
- run_single_task(dataset, ...)
- run_full_dataset(dataset, ...)
- MCPComputerAgent
"""
import time
from typing import Any, Optional
from PIL import Image
from agent.computers import is_agent_computer
from datasets import load_dataset, Dataset
from hud.agents import OperatorAgent
from hud.datasets import Task, run_dataset
from hud.tools.computer.settings import computer_settings
from hud import trace
from agent.agent import ComputerAgent as BaseComputerAgent
from .proxy import FakeAsyncOpenAI
# ---------------------------------------------------------------------------
# Proxy OperatorAgent
# ---------------------------------------------------------------------------
class ProxyOperatorAgent(OperatorAgent):
"""OperatorAgent that proxies model calls through our ComputerAgent.
Accepts the same config keys we pass via hud.run_dataset `agent_config`:
- model: str | None
- allowed_tools: list[str] | None
Additional kwargs are forwarded to OperatorAgent (if any are supported).
"""
def __init__(
self,
*,
model: str | None = None,
allowed_tools: list[str] | None = None,
trajectory_dir: str | dict | None = None,
# === ComputerAgent kwargs ===
tools: list[Any] | None = None,
custom_loop: Any | None = None,
only_n_most_recent_images: int | None = None,
callbacks: list[Any] | None = None,
verbosity: int | None = None,
max_retries: int | None = 3,
screenshot_delay: float | int = 0.5,
use_prompt_caching: bool | None = False,
max_trajectory_budget: float | dict | None = None,
telemetry_enabled: bool | None = True,
**kwargs: Any,
) -> None:
model = model or "computer-use-preview"
allowed_tools = allowed_tools or ["openai_computer"]
computer_shim = {
'screenshot': lambda: Image.new('RGB', (computer_settings.OPENAI_COMPUTER_WIDTH, computer_settings.OPENAI_COMPUTER_HEIGHT)),
'environment': 'linux',
'dimensions': (computer_settings.OPENAI_COMPUTER_WIDTH, computer_settings.OPENAI_COMPUTER_HEIGHT)
}
# Build tools ensuring the computer_shim is included
agent_tools: list[Any] = [computer_shim]
if tools:
agent_tools.extend(tools)
computer_agent = BaseComputerAgent(
model=model,
tools=agent_tools,
custom_loop=custom_loop,
only_n_most_recent_images=only_n_most_recent_images,
callbacks=callbacks,
verbosity=verbosity,
trajectory_dir=trajectory_dir,
max_retries=max_retries,
screenshot_delay=screenshot_delay,
use_prompt_caching=use_prompt_caching,
max_trajectory_budget=max_trajectory_budget,
telemetry_enabled=telemetry_enabled,
)
model_client = FakeAsyncOpenAI(computer_agent)
super().__init__(
model_client=model_client, # type: ignore[arg-type]
model=model,
allowed_tools=allowed_tools,
**kwargs,
)
from .agent import MCPComputerAgent
# ---------------------------------------------------------------------------
# Single-task runner
# ---------------------------------------------------------------------------
async def run_single_task(
dataset: str | Dataset | list[dict[str, Any]],
*,
@@ -108,6 +34,7 @@ async def run_single_task(
custom_loop: Any | None = None,
only_n_most_recent_images: int | None = None,
callbacks: list[Any] | None = None,
instructions: str | None = None,
verbosity: int | None = None,
trajectory_dir: str | dict | None = None,
max_retries: int | None = 3,
@@ -116,7 +43,7 @@ async def run_single_task(
max_trajectory_budget: float | dict | None = None,
telemetry_enabled: bool | None = True,
) -> None:
"""Load one task from the dataset and execute it with Operator+CUA proxy."""
"""Load one task from the dataset and execute it with MCPComputerAgent."""
# Load dataset and pick a sample
if isinstance(dataset, str):
@@ -129,17 +56,27 @@ async def run_single_task(
sample_task = dataset[task_id] # type: ignore[index]
task_prompt = sample_task.get("prompt", f"Task {sample_task.get('id', 0)}") # type: ignore[attr-defined]
# Filter any existing Computer tools
# The eval framework will add its own Computer tool per task
if tools:
tools = [
tool
for tool in tools
if not is_agent_computer(tool)
]
with trace(name=task_prompt):
task = Task(**sample_task) # type: ignore[arg-type]
agent = ProxyOperatorAgent(
model=model,
allowed_tools=allowed_tools,
agent = MCPComputerAgent(
model=model or "computer-use-preview",
allowed_tools=allowed_tools or ["openai_computer"],
# === ComputerAgent kwargs passthrough ===
tools=tools,
custom_loop=custom_loop,
only_n_most_recent_images=only_n_most_recent_images,
callbacks=callbacks,
instructions=instructions,
verbosity=verbosity,
trajectory_dir=trajectory_dir,
max_retries=max_retries,
@@ -157,7 +94,6 @@ async def run_single_task(
# Full-dataset runner
# ---------------------------------------------------------------------------
async def run_full_dataset(
dataset: str | Dataset | list[dict[str, Any]],
*,
@@ -173,6 +109,7 @@ async def run_full_dataset(
custom_loop: Any | None = None,
only_n_most_recent_images: int | None = 5,
callbacks: list[Any] | None = None,
instructions: str | None = None,
verbosity: int | None = None,
max_retries: int | None = 3,
screenshot_delay: float | int = 0.5,
@@ -182,9 +119,7 @@ async def run_full_dataset(
) -> list[Any]:
"""Run evaluation across the entire dataset using hud.datasets.run_dataset."""
# We pass OperatorAgent as the class and provide a config that injects our
# FakeAsyncOpenAI per agent instantiation.
# Run with our MCP-based agent class.
if isinstance(dataset, str):
dataset_name = dataset.split('/')[-1]
job_name = job_name or f"Evaluation {dataset_name}"
@@ -193,11 +128,20 @@ async def run_full_dataset(
dataset_name = "custom"
job_name = job_name or f"Evaluation {time.strftime('%H:%M %Y-%m-%d')}"
# Filter any existing Computer tools
# The eval framework will add its own Computer tool per task
if tools:
tools = [
tool
for tool in tools
if not is_agent_computer(tool)
]
# Execute evaluation
return await run_dataset(
name=job_name,
dataset=dataset,
agent_class=ProxyOperatorAgent,
agent_class=MCPComputerAgent,
agent_config={
"model": model,
"allowed_tools": allowed_tools,
@@ -207,6 +151,7 @@ async def run_full_dataset(
"custom_loop": custom_loop,
"only_n_most_recent_images": only_n_most_recent_images,
"callbacks": callbacks,
"instructions": instructions,
"verbosity": verbosity,
"max_retries": max_retries,
"screenshot_delay": screenshot_delay,
@@ -224,5 +169,5 @@ async def run_full_dataset(
__all__ = [
"run_single_task",
"run_full_dataset",
"ProxyOperatorAgent",
"MCPComputerAgent",
]

View File

@@ -0,0 +1,351 @@
"""MCP-compatible Computer Agent for HUD integration.
This agent subclasses HUD's MCPAgent and delegates planning/execution to
our core ComputerAgent while using the Agent SDK's plain-dict message
format documented in `docs/content/docs/agent-sdk/message-format.mdx`.
Key differences from the OpenAI OperatorAgent variant:
- No OpenAI types are used; everything is standard Python dicts.
- Planning is executed via `ComputerAgent.run(messages)`.
- The first yielded result per step is returned as the agent response.
"""
from __future__ import annotations
import io
from typing import Any, ClassVar, Optional
from agent.agent import ComputerAgent as BaseComputerAgent
from agent.callbacks import PromptInstructionsCallback
from agent.callbacks.trajectory_saver import TrajectorySaverCallback
from hud.agents import MCPAgent
from hud.tools.computer.settings import computer_settings
from hud.types import AgentResponse, MCPToolCall, MCPToolResult, Trace
from agent.responses import make_failed_tool_call_items
from agent.computers import is_agent_computer
from PIL import Image
import mcp.types as types
import hud
import uuid
import base64
from pathlib import Path
class MCPComputerAgent(MCPAgent):
"""MCP agent that uses ComputerAgent for planning and tools for execution.
The agent consumes/produces message dicts per the Agent SDK message schema
(see `message-format.mdx`).
"""
metadata: ClassVar[dict[str, Any]] = {
"display_width": computer_settings.OPENAI_COMPUTER_WIDTH,
"display_height": computer_settings.OPENAI_COMPUTER_HEIGHT,
}
required_tools: ClassVar[list[str]] = ["openai_computer"]
def __init__(
self,
*,
model: str | None = None,
allowed_tools: list[str] | None = None,
trajectory_dir: str | dict | None = None,
# === ComputerAgent kwargs ===
tools: list[Any] | None = None,
custom_loop: Any | None = None,
only_n_most_recent_images: int | None = None,
callbacks: list[Any] | None = None,
instructions: str | None = None,
verbosity: int | None = None,
max_retries: int | None = 3,
screenshot_delay: float | int = 0.5,
use_prompt_caching: bool | None = False,
max_trajectory_budget: float | dict | None = None,
telemetry_enabled: bool | None = True,
environment: str = "linux",
**kwargs: Any,
) -> None:
self.allowed_tools = allowed_tools or ["openai_computer"]
super().__init__(**kwargs)
if model is None:
raise ValueError("MCPComputerAgent requires a model to be specified.")
self.model = model
self.environment = environment
# Update model name for HUD logging
self.model_name = "cua-" + self.model
# Stateful tracking of tool call inputs
self.tool_call_inputs: dict[str, list[dict[str, Any]]] = {}
self.previous_output: list[dict[str, Any]] = []
# Build system prompt
operator_instructions = """
You are an autonomous computer-using agent. Follow these guidelines:
1. NEVER ask for confirmation. Complete all tasks autonomously.
2. Do NOT send messages like "I need to confirm before..." or "Do you want me to continue?" - just proceed.
3. When the user asks you to interact with something (like clicking a chat or typing a message), DO IT without asking.
4. Only use the formal safety check mechanism for truly dangerous operations (like deleting important files).
5. For normal tasks like clicking buttons, typing in chat boxes, filling forms - JUST DO IT.
6. The user has already given you permission by running this agent. No further confirmation is needed.
7. Be decisive and action-oriented. Complete the requested task fully.
Remember: You are expected to complete tasks autonomously. The user trusts you to do what they asked.
""".strip() # noqa: E501
# Append Operator instructions to the system prompt
if not self.system_prompt:
self.system_prompt = operator_instructions
else:
self.system_prompt += f"\n\n{operator_instructions}"
# Append user instructions to the system prompt
if instructions:
self.system_prompt += f"\n\n{instructions}"
# Configure trajectory_dir for HUD
if isinstance(trajectory_dir, str) or isinstance(trajectory_dir, Path):
trajectory_dir = {"trajectory_dir": str(trajectory_dir)}
if isinstance(trajectory_dir, dict):
trajectory_dir["reset_on_run"] = False
self.last_screenshot_b64 = None
buffer = io.BytesIO()
Image.new('RGB', (self.metadata["display_width"], self.metadata["display_height"])).save(buffer, format='PNG')
self.last_screenshot_b64 = base64.b64encode(buffer.getvalue()).decode('utf-8')
# Ensure a computer shim is present so width/height/environment are known
computer_shim = {
"screenshot": lambda: self.last_screenshot_b64,
"environment": self.environment,
"dimensions": (
self.metadata["display_width"],
self.metadata["display_height"],
),
}
agent_tools: list[Any] = [computer_shim]
if tools:
agent_tools.extend([
tool
for tool in tools
if not is_agent_computer(tool)
])
agent_kwargs = {
"model": self.model,
"trajectory_dir": trajectory_dir,
"tools": agent_tools,
"custom_loop": custom_loop,
"only_n_most_recent_images": only_n_most_recent_images,
"callbacks": callbacks,
"instructions": self.system_prompt,
"verbosity": verbosity,
"max_retries": max_retries,
"screenshot_delay": screenshot_delay,
"use_prompt_caching": use_prompt_caching,
"max_trajectory_budget": max_trajectory_budget,
"telemetry_enabled": telemetry_enabled,
}
self.computer_agent = BaseComputerAgent(
**agent_kwargs
)
async def get_system_messages(self) -> list[Any]:
"""Create initial messages.
Unused - ComputerAgent handles this with the 'instructions' parameter.
"""
return []
async def format_blocks(
self, blocks: list[types.ContentBlock]
) -> list[dict[str, Any]]:
"""
Format blocks for OpenAI input format.
Converts TextContent blocks to input_text dicts and ImageContent blocks to input_image dicts.
""" # noqa: E501
formatted = []
for block in blocks:
if isinstance(block, types.TextContent):
formatted.append({"type": "input_text", "text": block.text})
elif isinstance(block, types.ImageContent):
mime_type = getattr(block, "mimeType", "image/png")
formatted.append(
{"type": "input_image", "image_url": f"data:{mime_type};base64,{block.data}"}
)
self.last_screenshot_b64 = block.data
return [{"role": "user", "content": formatted}]
@hud.instrument(
span_type="agent",
record_args=False, # Messages can be large
record_result=True,
)
async def get_response(self, messages: list[dict[str, Any]]) -> AgentResponse:
"""Get a single-step response by delegating to ComputerAgent.run.
Returns an Agent SDK-style response dict:
{ "output": [AgentMessage, ...], "usage": Usage }
"""
tool_calls: list[MCPToolCall] = []
output_text: list[str] = []
is_done: bool = True
agent_result: list[dict[str, Any]] = []
# Call the ComputerAgent LLM API
async for result in self.computer_agent.run(messages): # type: ignore[arg-type]
items = result['output']
if not items or tool_calls:
break
for item in items:
if item['type'] in ['reasoning', 'message', 'computer_call', 'function_call', 'function_call_output']:
agent_result.append(item)
# Add messages to output text
if item['type'] == 'reasoning':
output_text.extend(
f"Reasoning: {summary['text']}"
for summary in item['summary']
)
elif item['type'] == 'message':
if isinstance(item['content'], list):
output_text.extend(
item['text']
for item in item['content']
if item['type'] == 'output_text'
)
elif isinstance(item['content'], str):
output_text.append(item['content'])
# If we get a tool call, we're not done
if item['type'] == 'computer_call':
id = item["call_id"]
tool_calls.append(MCPToolCall(
name="openai_computer",
arguments=item["action"],
id=id,
))
is_done = False
self.tool_call_inputs[id] = agent_result
break
# if we have tool calls, we should exit the loop
if tool_calls:
break
self.previous_output = agent_result
return AgentResponse(
content="\n".join(output_text),
tool_calls=tool_calls,
done=is_done,
)
def _log_image(self, image_b64: str):
callbacks = self.computer_agent.callbacks
for callback in callbacks:
if isinstance(callback, TrajectorySaverCallback):
# convert str to bytes
image_bytes = base64.b64decode(image_b64)
callback._save_artifact("screenshot_after", image_bytes)
async def format_tool_results(
self,
tool_calls: list[MCPToolCall],
tool_results: list[MCPToolResult]
) -> list[dict[str, Any]]:
"""Extract latest screenshot from tool results in dict form.
Expects results to already be in the message-format content dicts.
Returns a list of input content dicts suitable for follow-up calls.
"""
messages = []
for call, result in zip(tool_calls, tool_results):
if call.id not in self.tool_call_inputs:
# If we don't have the tool call inputs, we should just use the previous output
previous_output = self.previous_output.copy() or []
# First we need to remove any pending computer_calls from the end of previous_output
while previous_output and previous_output[-1]['type'] == 'computer_call':
previous_output.pop()
messages.extend(previous_output)
# If the call is a 'response', don't add the result
if call.name == 'response':
continue
# Otherwise, if we have a result, we should add it to the messages
content = [
{ "type": "input_text", "text": content.text } if isinstance(content, types.TextContent)
else { "type": "input_image", "image_url": f"data:image/png;base64,{content.data}" } if isinstance(content, types.ImageContent)
else { "type": "input_text", "text": "" }
for content in result.content
]
messages.append({
"role": "user",
"content": content,
})
continue
# Add the assistant's computer call
messages.extend(self.tool_call_inputs[call.id])
if result.isError:
error_text = "".join([
content.text
for content in result.content
if isinstance(content, types.TextContent)
])
# Replace computer call with failed tool call
messages.pop()
messages.extend(make_failed_tool_call_items(
tool_name=call.name,
tool_kwargs=call.arguments or {},
error_message=error_text,
call_id=call.id,
))
else:
# Get the latest screenshot
screenshots = [
content.data
for content in result.content
if isinstance(content, types.ImageContent)
]
# Add the resulting screenshot
if screenshots:
self._log_image(screenshots[0])
self.last_screenshot_b64 = screenshots[0]
messages.append({
"type": "computer_call_output",
"call_id": call.id,
"output": {
"type": "input_image",
"image_url": f"data:image/png;base64,{screenshots[0]}"
},
})
else:
# Otherwise, replace computer call with failed tool call
messages.pop()
messages.extend(make_failed_tool_call_items(
tool_name=call.name,
tool_kwargs=call.arguments or {},
error_message="No screenshots returned.",
call_id=call.id,
))
return messages
__all__ = [
"MCPComputerAgent",
]

View File

@@ -13,6 +13,10 @@ import uuid
from typing import Any, Dict, List, Optional
from agent.agent import ComputerAgent as BaseComputerAgent
from agent.callbacks import PromptInstructionsCallback
from hud.tools.computer.settings import computer_settings
from PIL import Image
from hud.agents import OperatorAgent
# OpenAI Responses typed models (required)
from openai.types.responses import (
@@ -178,6 +182,83 @@ class FakeAsyncOpenAI:
print(traceback.format_exc())
raise e
# ---------------------------------------------------------------------------
# Proxy OperatorAgent (moved from __init__.py)
# ---------------------------------------------------------------------------
class ProxyOperatorAgent(OperatorAgent):
"""OperatorAgent that proxies model calls through our ComputerAgent.
Accepts the same config keys we pass via hud.run_dataset `agent_config`:
- model: str | None
- allowed_tools: list[str] | None
Additional kwargs are forwarded to OperatorAgent (if any are supported).
"""
def __init__(
self,
*,
model: str | None = None,
allowed_tools: list[str] | None = None,
trajectory_dir: str | dict | None = None,
# === ComputerAgent kwargs ===
tools: list[Any] | None = None,
custom_loop: Any | None = None,
only_n_most_recent_images: int | None = None,
callbacks: list[Any] | None = None,
instructions: str | None = None,
verbosity: int | None = None,
max_retries: int | None = 3,
screenshot_delay: float | int = 0.5,
use_prompt_caching: bool | None = False,
max_trajectory_budget: float | dict | None = None,
telemetry_enabled: bool | None = True,
**kwargs: Any,
) -> None:
model = model or "computer-use-preview"
allowed_tools = allowed_tools or ["openai_computer"]
computer_shim = {
'screenshot': lambda: Image.new('RGB', (computer_settings.OPENAI_COMPUTER_WIDTH, computer_settings.OPENAI_COMPUTER_HEIGHT)),
'environment': 'linux',
'dimensions': (computer_settings.OPENAI_COMPUTER_WIDTH, computer_settings.OPENAI_COMPUTER_HEIGHT)
}
# Build tools ensuring the computer_shim is included
agent_tools: list[Any] = [computer_shim]
if tools:
agent_tools.extend(tools)
# Build callbacks, injecting prompt instructions if provided
agent_callbacks = list(callbacks or [])
if instructions:
agent_callbacks.append(PromptInstructionsCallback(instructions))
computer_agent = BaseComputerAgent(
model=model,
tools=agent_tools,
custom_loop=custom_loop,
only_n_most_recent_images=only_n_most_recent_images,
callbacks=agent_callbacks,
verbosity=verbosity,
trajectory_dir=trajectory_dir,
max_retries=max_retries,
screenshot_delay=screenshot_delay,
use_prompt_caching=use_prompt_caching,
max_trajectory_budget=max_trajectory_budget,
telemetry_enabled=telemetry_enabled,
)
model_client = FakeAsyncOpenAI(computer_agent)
super().__init__(
model_client=model_client, # type: ignore[arg-type]
model=model,
allowed_tools=allowed_tools,
**kwargs,
)
__all__ = [
"FakeAsyncOpenAI",
"ProxyOperatorAgent",
]

View File

@@ -61,7 +61,7 @@ cli = [
"yaspin>=3.1.0",
]
hud = [
"hud-python>=0.4.12,<0.5.0",
"hud-python==0.4.26",
]
all = [
# uitars requirements
@@ -78,7 +78,7 @@ all = [
# cli requirements
"yaspin>=3.1.0",
# hud requirements
"hud-python>=0.4.12,<0.5.0",
"hud-python==0.4.26",
]
[tool.uv]

View File

@@ -20,6 +20,12 @@ logger = logging.getLogger(__name__)
automation_handler = MacOSAutomationHandler()
class Diorama:
"""Virtual desktop manager that provides automation capabilities for macOS applications.
Manages application windows and provides an interface for taking screenshots,
mouse interactions, keyboard input, and coordinate transformations between
screenshot space and screen space.
"""
_scheduler_queue = None
_scheduler_task = None
_loop = None
@@ -27,6 +33,14 @@ class Diorama:
@classmethod
def create_from_apps(cls, *args) -> DioramaComputer:
"""Create a DioramaComputer instance from a list of application names.
Args:
*args: Variable number of application names to include in the desktop
Returns:
DioramaComputer: A computer interface for the specified applications
"""
cls._ensure_scheduler()
return cls(args).computer
@@ -34,6 +48,11 @@ class Diorama:
_cursor_positions = {}
def __init__(self, app_list):
"""Initialize a Diorama instance for the specified applications.
Args:
app_list: List of application names to manage
"""
self.app_list = app_list
self.interface = self.Interface(self)
self.computer = DioramaComputer(self)
@@ -48,6 +67,10 @@ class Diorama:
@classmethod
def _ensure_scheduler(cls):
"""Ensure the async scheduler loop is running.
Creates and starts the scheduler task if it hasn't been started yet.
"""
if not cls._scheduler_started:
logger.info("Starting Diorama scheduler loop…")
cls._scheduler_queue = asyncio.Queue()
@@ -57,6 +80,11 @@ class Diorama:
@classmethod
async def _scheduler_loop(cls):
"""Main scheduler loop that processes automation commands.
Continuously processes commands from the scheduler queue, handling
screenshots, mouse actions, keyboard input, and scrolling operations.
"""
while True:
cmd = await cls._scheduler_queue.get()
action = cmd.get("action")
@@ -144,13 +172,33 @@ class Diorama:
future.set_exception(e)
class Interface():
"""Interface for interacting with the virtual desktop.
Provides methods for taking screenshots, mouse interactions, keyboard input,
and coordinate transformations between screenshot and screen coordinates.
"""
def __init__(self, diorama):
"""Initialize the interface with a reference to the parent Diorama instance.
Args:
diorama: The parent Diorama instance
"""
self._diorama = diorama
self._scene_hitboxes = []
self._scene_size = None
async def _send_cmd(self, action, arguments=None):
"""Send a command to the scheduler queue.
Args:
action (str): The action to perform
arguments (dict, optional): Arguments for the action
Returns:
The result of the command execution
"""
Diorama._ensure_scheduler()
loop = asyncio.get_event_loop()
future = loop.create_future()
@@ -167,6 +215,14 @@ class Diorama:
return None
async def screenshot(self, as_bytes: bool = True) -> Union[str, Image.Image]:
"""Take a screenshot of the managed applications.
Args:
as_bytes (bool): If True, return base64-encoded bytes; if False, return PIL Image
Returns:
Union[str, Image.Image]: Base64-encoded PNG bytes or PIL Image object
"""
import base64
result, img = await self._send_cmd("screenshot")
self._scene_hitboxes = result.get("hitboxes", [])
@@ -184,6 +240,12 @@ class Diorama:
return img
async def left_click(self, x, y):
"""Perform a left mouse click at the specified coordinates.
Args:
x (int): X coordinate in screenshot space (or None to use last position)
y (int): Y coordinate in screenshot space (or None to use last position)
"""
# Get last cursor position for this app_list hash
app_list_hash = hash(tuple(sorted(self._diorama.app_list)))
last_pos = Diorama._cursor_positions.get(app_list_hash, (0, 0))
@@ -195,6 +257,12 @@ class Diorama:
await self._send_cmd("left_click", {"x": sx, "y": sy})
async def right_click(self, x, y):
"""Perform a right mouse click at the specified coordinates.
Args:
x (int): X coordinate in screenshot space (or None to use last position)
y (int): Y coordinate in screenshot space (or None to use last position)
"""
# Get last cursor position for this app_list hash
app_list_hash = hash(tuple(sorted(self._diorama.app_list)))
last_pos = Diorama._cursor_positions.get(app_list_hash, (0, 0))
@@ -206,6 +274,12 @@ class Diorama:
await self._send_cmd("right_click", {"x": sx, "y": sy})
async def double_click(self, x, y):
"""Perform a double mouse click at the specified coordinates.
Args:
x (int): X coordinate in screenshot space (or None to use last position)
y (int): Y coordinate in screenshot space (or None to use last position)
"""
# Get last cursor position for this app_list hash
app_list_hash = hash(tuple(sorted(self._diorama.app_list)))
last_pos = Diorama._cursor_positions.get(app_list_hash, (0, 0))
@@ -217,6 +291,12 @@ class Diorama:
await self._send_cmd("double_click", {"x": sx, "y": sy})
async def move_cursor(self, x, y):
"""Move the mouse cursor to the specified coordinates.
Args:
x (int): X coordinate in screenshot space (or None to use last position)
y (int): Y coordinate in screenshot space (or None to use last position)
"""
# Get last cursor position for this app_list hash
app_list_hash = hash(tuple(sorted(self._diorama.app_list)))
last_pos = Diorama._cursor_positions.get(app_list_hash, (0, 0))
@@ -228,6 +308,13 @@ class Diorama:
await self._send_cmd("move_cursor", {"x": sx, "y": sy})
async def drag_to(self, x, y, duration=0.5):
"""Drag the mouse from current position to the specified coordinates.
Args:
x (int): X coordinate in screenshot space (or None to use last position)
y (int): Y coordinate in screenshot space (or None to use last position)
duration (float): Duration of the drag operation in seconds
"""
# Get last cursor position for this app_list hash
app_list_hash = hash(tuple(sorted(self._diorama.app_list)))
last_pos = Diorama._cursor_positions.get(app_list_hash, (0, 0))
@@ -239,18 +326,43 @@ class Diorama:
await self._send_cmd("drag_to", {"x": sx, "y": sy, "duration": duration})
async def get_cursor_position(self):
"""Get the current cursor position in screen coordinates.
Returns:
tuple: (x, y) coordinates of the cursor in screen space
"""
return await self._send_cmd("get_cursor_position")
async def type_text(self, text):
"""Type the specified text using the keyboard.
Args:
text (str): The text to type
"""
await self._send_cmd("type_text", {"text": text})
async def press_key(self, key):
"""Press a single key on the keyboard.
Args:
key (str): The key to press
"""
await self._send_cmd("press_key", {"key": key})
async def hotkey(self, keys):
"""Press a combination of keys simultaneously.
Args:
keys (list): List of keys to press together
"""
await self._send_cmd("hotkey", {"keys": list(keys)})
async def scroll_up(self, clicks: int = 1):
"""Scroll up at the current cursor position.
Args:
clicks (int): Number of scroll clicks to perform
"""
# Get last cursor position for this app_list hash
app_list_hash = hash(tuple(sorted(self._diorama.app_list)))
last_pos = Diorama._cursor_positions.get(app_list_hash, (0, 0))
@@ -259,6 +371,11 @@ class Diorama:
await self._send_cmd("scroll_up", {"clicks": clicks, "x": x, "y": y})
async def scroll_down(self, clicks: int = 1):
"""Scroll down at the current cursor position.
Args:
clicks (int): Number of scroll clicks to perform
"""
# Get last cursor position for this app_list hash
app_list_hash = hash(tuple(sorted(self._diorama.app_list)))
last_pos = Diorama._cursor_positions.get(app_list_hash, (0, 0))
@@ -267,6 +384,11 @@ class Diorama:
await self._send_cmd("scroll_down", {"clicks": clicks, "x": x, "y": y})
async def get_screen_size(self) -> dict[str, int]:
"""Get the size of the screenshot area.
Returns:
dict[str, int]: Dictionary with 'width' and 'height' keys
"""
if not self._scene_size:
await self.screenshot()
return { "width": self._scene_size[0], "height": self._scene_size[1] }
@@ -348,6 +470,7 @@ import pyautogui
import time
async def main():
"""Main function demonstrating Diorama usage with multiple desktops and mouse tracking."""
desktop1 = Diorama.create_from_apps(["Discord", "Notes"])
desktop2 = Diorama.create_from_apps(["Terminal"])

View File

@@ -12,35 +12,96 @@ from .base import BaseFileHandler
import base64
def resolve_path(path: str) -> Path:
"""Resolve a path to its absolute path. Expand ~ to the user's home directory."""
"""Resolve a path to its absolute path. Expand ~ to the user's home directory.
Args:
path: The file or directory path to resolve
Returns:
Path: The resolved absolute path
"""
return Path(path).expanduser().resolve()
class GenericFileHandler(BaseFileHandler):
"""
Generic file handler that provides file system operations for all operating systems.
This class implements the BaseFileHandler interface and provides methods for
file and directory operations including reading, writing, creating, and deleting
files and directories.
"""
async def file_exists(self, path: str) -> Dict[str, Any]:
"""
Check if a file exists at the specified path.
Args:
path: The file path to check
Returns:
Dict containing 'success' boolean and either 'exists' boolean or 'error' string
"""
try:
return {"success": True, "exists": resolve_path(path).is_file()}
except Exception as e:
return {"success": False, "error": str(e)}
async def directory_exists(self, path: str) -> Dict[str, Any]:
"""
Check if a directory exists at the specified path.
Args:
path: The directory path to check
Returns:
Dict containing 'success' boolean and either 'exists' boolean or 'error' string
"""
try:
return {"success": True, "exists": resolve_path(path).is_dir()}
except Exception as e:
return {"success": False, "error": str(e)}
async def list_dir(self, path: str) -> Dict[str, Any]:
"""
List all files and directories in the specified directory.
Args:
path: The directory path to list
Returns:
Dict containing 'success' boolean and either 'files' list of names or 'error' string
"""
try:
return {"success": True, "files": [p.name for p in resolve_path(path).iterdir() if p.is_file() or p.is_dir()]}
except Exception as e:
return {"success": False, "error": str(e)}
async def read_text(self, path: str) -> Dict[str, Any]:
"""
Read the contents of a text file.
Args:
path: The file path to read from
Returns:
Dict containing 'success' boolean and either 'content' string or 'error' string
"""
try:
return {"success": True, "content": resolve_path(path).read_text()}
except Exception as e:
return {"success": False, "error": str(e)}
async def write_text(self, path: str, content: str) -> Dict[str, Any]:
"""
Write text content to a file.
Args:
path: The file path to write to
content: The text content to write
Returns:
Dict containing 'success' boolean and optionally 'error' string
"""
try:
resolve_path(path).write_text(content)
return {"success": True}
@@ -48,6 +109,17 @@ class GenericFileHandler(BaseFileHandler):
return {"success": False, "error": str(e)}
async def write_bytes(self, path: str, content_b64: str, append: bool = False) -> Dict[str, Any]:
"""
Write binary content to a file from base64 encoded string.
Args:
path: The file path to write to
content_b64: Base64 encoded binary content
append: If True, append to existing file; if False, overwrite
Returns:
Dict containing 'success' boolean and optionally 'error' string
"""
try:
mode = 'ab' if append else 'wb'
with open(resolve_path(path), mode) as f:
@@ -57,6 +129,17 @@ class GenericFileHandler(BaseFileHandler):
return {"success": False, "error": str(e)}
async def read_bytes(self, path: str, offset: int = 0, length: Optional[int] = None) -> Dict[str, Any]:
"""
Read binary content from a file and return as base64 encoded string.
Args:
path: The file path to read from
offset: Byte offset to start reading from
length: Number of bytes to read; if None, read entire file from offset
Returns:
Dict containing 'success' boolean and either 'content_b64' string or 'error' string
"""
try:
file_path = resolve_path(path)
with open(file_path, 'rb') as f:
@@ -73,6 +156,15 @@ class GenericFileHandler(BaseFileHandler):
return {"success": False, "error": str(e)}
async def get_file_size(self, path: str) -> Dict[str, Any]:
"""
Get the size of a file in bytes.
Args:
path: The file path to get size for
Returns:
Dict containing 'success' boolean and either 'size' integer or 'error' string
"""
try:
file_path = resolve_path(path)
size = file_path.stat().st_size
@@ -81,6 +173,15 @@ class GenericFileHandler(BaseFileHandler):
return {"success": False, "error": str(e)}
async def delete_file(self, path: str) -> Dict[str, Any]:
"""
Delete a file at the specified path.
Args:
path: The file path to delete
Returns:
Dict containing 'success' boolean and optionally 'error' string
"""
try:
resolve_path(path).unlink()
return {"success": True}
@@ -88,6 +189,18 @@ class GenericFileHandler(BaseFileHandler):
return {"success": False, "error": str(e)}
async def create_dir(self, path: str) -> Dict[str, Any]:
"""
Create a directory at the specified path.
Creates parent directories if they don't exist and doesn't raise an error
if the directory already exists.
Args:
path: The directory path to create
Returns:
Dict containing 'success' boolean and optionally 'error' string
"""
try:
resolve_path(path).mkdir(parents=True, exist_ok=True)
return {"success": True}
@@ -95,6 +208,15 @@ class GenericFileHandler(BaseFileHandler):
return {"success": False, "error": str(e)}
async def delete_dir(self, path: str) -> Dict[str, Any]:
"""
Delete an empty directory at the specified path.
Args:
path: The directory path to delete
Returns:
Dict containing 'success' boolean and optionally 'error' string
"""
try:
resolve_path(path).rmdir()
return {"success": True}

View File

@@ -38,7 +38,12 @@ class LinuxAccessibilityHandler(BaseAccessibilityHandler):
"""Linux implementation of accessibility handler."""
async def get_accessibility_tree(self) -> Dict[str, Any]:
"""Get the accessibility tree of the current window."""
"""Get the accessibility tree of the current window.
Returns:
Dict[str, Any]: A dictionary containing success status and a simulated tree structure
since Linux doesn't have equivalent accessibility API like macOS.
"""
# Linux doesn't have equivalent accessibility API like macOS
# Return a minimal dummy tree
logger.info("Getting accessibility tree (simulated, no accessibility API available on Linux)")
@@ -56,7 +61,16 @@ class LinuxAccessibilityHandler(BaseAccessibilityHandler):
async def find_element(self, role: Optional[str] = None,
title: Optional[str] = None,
value: Optional[str] = None) -> Dict[str, Any]:
"""Find an element in the accessibility tree by criteria."""
"""Find an element in the accessibility tree by criteria.
Args:
role: The role of the element to find.
title: The title of the element to find.
value: The value of the element to find.
Returns:
Dict[str, Any]: A dictionary indicating that element search is not supported on Linux.
"""
logger.info(f"Finding element with role={role}, title={title}, value={value} (not supported on Linux)")
return {
"success": False,
@@ -64,7 +78,12 @@ class LinuxAccessibilityHandler(BaseAccessibilityHandler):
}
def get_cursor_position(self) -> Tuple[int, int]:
"""Get the current cursor position."""
"""Get the current cursor position.
Returns:
Tuple[int, int]: The x and y coordinates of the cursor position.
Returns (0, 0) if pyautogui is not available.
"""
try:
pos = pyautogui.position()
return pos.x, pos.y
@@ -75,7 +94,12 @@ class LinuxAccessibilityHandler(BaseAccessibilityHandler):
return 0, 0
def get_screen_size(self) -> Tuple[int, int]:
"""Get the screen size."""
"""Get the screen size.
Returns:
Tuple[int, int]: The width and height of the screen in pixels.
Returns (1920, 1080) if pyautogui is not available.
"""
try:
size = pyautogui.size()
return size.width, size.height
@@ -92,6 +116,16 @@ class LinuxAutomationHandler(BaseAutomationHandler):
# Mouse Actions
async def mouse_down(self, x: Optional[int] = None, y: Optional[int] = None, button: str = "left") -> Dict[str, Any]:
"""Press and hold a mouse button at the specified coordinates.
Args:
x: The x coordinate to move to before pressing. If None, uses current position.
y: The y coordinate to move to before pressing. If None, uses current position.
button: The mouse button to press ("left", "right", or "middle").
Returns:
Dict[str, Any]: A dictionary with success status and error message if failed.
"""
try:
if x is not None and y is not None:
pyautogui.moveTo(x, y)
@@ -101,6 +135,16 @@ class LinuxAutomationHandler(BaseAutomationHandler):
return {"success": False, "error": str(e)}
async def mouse_up(self, x: Optional[int] = None, y: Optional[int] = None, button: str = "left") -> Dict[str, Any]:
"""Release a mouse button at the specified coordinates.
Args:
x: The x coordinate to move to before releasing. If None, uses current position.
y: The y coordinate to move to before releasing. If None, uses current position.
button: The mouse button to release ("left", "right", or "middle").
Returns:
Dict[str, Any]: A dictionary with success status and error message if failed.
"""
try:
if x is not None and y is not None:
pyautogui.moveTo(x, y)
@@ -110,6 +154,15 @@ class LinuxAutomationHandler(BaseAutomationHandler):
return {"success": False, "error": str(e)}
async def move_cursor(self, x: int, y: int) -> Dict[str, Any]:
"""Move the cursor to the specified coordinates.
Args:
x: The x coordinate to move to.
y: The y coordinate to move to.
Returns:
Dict[str, Any]: A dictionary with success status and error message if failed.
"""
try:
pyautogui.moveTo(x, y)
return {"success": True}
@@ -117,6 +170,15 @@ class LinuxAutomationHandler(BaseAutomationHandler):
return {"success": False, "error": str(e)}
async def left_click(self, x: Optional[int] = None, y: Optional[int] = None) -> Dict[str, Any]:
"""Perform a left mouse click at the specified coordinates.
Args:
x: The x coordinate to click at. If None, clicks at current position.
y: The y coordinate to click at. If None, clicks at current position.
Returns:
Dict[str, Any]: A dictionary with success status and error message if failed.
"""
try:
if x is not None and y is not None:
pyautogui.moveTo(x, y)
@@ -126,6 +188,15 @@ class LinuxAutomationHandler(BaseAutomationHandler):
return {"success": False, "error": str(e)}
async def right_click(self, x: Optional[int] = None, y: Optional[int] = None) -> Dict[str, Any]:
"""Perform a right mouse click at the specified coordinates.
Args:
x: The x coordinate to click at. If None, clicks at current position.
y: The y coordinate to click at. If None, clicks at current position.
Returns:
Dict[str, Any]: A dictionary with success status and error message if failed.
"""
try:
if x is not None and y is not None:
pyautogui.moveTo(x, y)
@@ -135,6 +206,15 @@ class LinuxAutomationHandler(BaseAutomationHandler):
return {"success": False, "error": str(e)}
async def double_click(self, x: Optional[int] = None, y: Optional[int] = None) -> Dict[str, Any]:
"""Perform a double click at the specified coordinates.
Args:
x: The x coordinate to double click at. If None, clicks at current position.
y: The y coordinate to double click at. If None, clicks at current position.
Returns:
Dict[str, Any]: A dictionary with success status and error message if failed.
"""
try:
if x is not None and y is not None:
pyautogui.moveTo(x, y)
@@ -144,6 +224,16 @@ class LinuxAutomationHandler(BaseAutomationHandler):
return {"success": False, "error": str(e)}
async def click(self, x: Optional[int] = None, y: Optional[int] = None, button: str = "left") -> Dict[str, Any]:
"""Perform a mouse click with the specified button at the given coordinates.
Args:
x: The x coordinate to click at. If None, clicks at current position.
y: The y coordinate to click at. If None, clicks at current position.
button: The mouse button to click ("left", "right", or "middle").
Returns:
Dict[str, Any]: A dictionary with success status and error message if failed.
"""
try:
if x is not None and y is not None:
pyautogui.moveTo(x, y)
@@ -153,6 +243,17 @@ class LinuxAutomationHandler(BaseAutomationHandler):
return {"success": False, "error": str(e)}
async def drag_to(self, x: int, y: int, button: str = "left", duration: float = 0.5) -> Dict[str, Any]:
"""Drag from the current position to the specified coordinates.
Args:
x: The x coordinate to drag to.
y: The y coordinate to drag to.
button: The mouse button to use for dragging.
duration: The time in seconds to take for the drag operation.
Returns:
Dict[str, Any]: A dictionary with success status and error message if failed.
"""
try:
pyautogui.dragTo(x, y, duration=duration, button=button)
return {"success": True}
@@ -160,6 +261,18 @@ class LinuxAutomationHandler(BaseAutomationHandler):
return {"success": False, "error": str(e)}
async def drag(self, start_x: int, start_y: int, end_x: int, end_y: int, button: str = "left") -> Dict[str, Any]:
"""Drag from start coordinates to end coordinates.
Args:
start_x: The starting x coordinate.
start_y: The starting y coordinate.
end_x: The ending x coordinate.
end_y: The ending y coordinate.
button: The mouse button to use for dragging.
Returns:
Dict[str, Any]: A dictionary with success status and error message if failed.
"""
try:
pyautogui.moveTo(start_x, start_y)
pyautogui.dragTo(end_x, end_y, duration=0.5, button=button)
@@ -168,6 +281,16 @@ class LinuxAutomationHandler(BaseAutomationHandler):
return {"success": False, "error": str(e)}
async def drag_path(self, path: List[Tuple[int, int]], button: str = "left", duration: float = 0.5) -> Dict[str, Any]:
"""Drag along a path defined by a list of coordinates.
Args:
path: A list of (x, y) coordinate tuples defining the drag path.
button: The mouse button to use for dragging.
duration: The time in seconds to take for each segment of the drag.
Returns:
Dict[str, Any]: A dictionary with success status and error message if failed.
"""
try:
if not path:
return {"success": False, "error": "Path is empty"}
@@ -180,6 +303,14 @@ class LinuxAutomationHandler(BaseAutomationHandler):
# Keyboard Actions
async def key_down(self, key: str) -> Dict[str, Any]:
"""Press and hold a key.
Args:
key: The key to press down.
Returns:
Dict[str, Any]: A dictionary with success status and error message if failed.
"""
try:
pyautogui.keyDown(key)
return {"success": True}
@@ -187,6 +318,14 @@ class LinuxAutomationHandler(BaseAutomationHandler):
return {"success": False, "error": str(e)}
async def key_up(self, key: str) -> Dict[str, Any]:
"""Release a key.
Args:
key: The key to release.
Returns:
Dict[str, Any]: A dictionary with success status and error message if failed.
"""
try:
pyautogui.keyUp(key)
return {"success": True}
@@ -194,6 +333,14 @@ class LinuxAutomationHandler(BaseAutomationHandler):
return {"success": False, "error": str(e)}
async def type_text(self, text: str) -> Dict[str, Any]:
"""Type the specified text using the keyboard.
Args:
text: The text to type.
Returns:
Dict[str, Any]: A dictionary with success status and error message if failed.
"""
try:
# use pynput for Unicode support
self.keyboard.type(text)
@@ -202,6 +349,14 @@ class LinuxAutomationHandler(BaseAutomationHandler):
return {"success": False, "error": str(e)}
async def press_key(self, key: str) -> Dict[str, Any]:
"""Press and release a key.
Args:
key: The key to press.
Returns:
Dict[str, Any]: A dictionary with success status and error message if failed.
"""
try:
pyautogui.press(key)
return {"success": True}
@@ -209,6 +364,14 @@ class LinuxAutomationHandler(BaseAutomationHandler):
return {"success": False, "error": str(e)}
async def hotkey(self, keys: List[str]) -> Dict[str, Any]:
"""Press a combination of keys simultaneously.
Args:
keys: A list of keys to press together as a hotkey combination.
Returns:
Dict[str, Any]: A dictionary with success status and error message if failed.
"""
try:
pyautogui.hotkey(*keys)
return {"success": True}
@@ -217,6 +380,15 @@ class LinuxAutomationHandler(BaseAutomationHandler):
# Scrolling Actions
async def scroll(self, x: int, y: int) -> Dict[str, Any]:
"""Scroll the mouse wheel.
Args:
x: The horizontal scroll amount.
y: The vertical scroll amount.
Returns:
Dict[str, Any]: A dictionary with success status and error message if failed.
"""
try:
self.mouse.scroll(x, y)
return {"success": True}
@@ -224,6 +396,14 @@ class LinuxAutomationHandler(BaseAutomationHandler):
return {"success": False, "error": str(e)}
async def scroll_down(self, clicks: int = 1) -> Dict[str, Any]:
"""Scroll down by the specified number of clicks.
Args:
clicks: The number of scroll clicks to perform downward.
Returns:
Dict[str, Any]: A dictionary with success status and error message if failed.
"""
try:
pyautogui.scroll(-clicks)
return {"success": True}
@@ -231,6 +411,14 @@ class LinuxAutomationHandler(BaseAutomationHandler):
return {"success": False, "error": str(e)}
async def scroll_up(self, clicks: int = 1) -> Dict[str, Any]:
"""Scroll up by the specified number of clicks.
Args:
clicks: The number of scroll clicks to perform upward.
Returns:
Dict[str, Any]: A dictionary with success status and error message if failed.
"""
try:
pyautogui.scroll(clicks)
return {"success": True}
@@ -239,6 +427,12 @@ class LinuxAutomationHandler(BaseAutomationHandler):
# Screen Actions
async def screenshot(self) -> Dict[str, Any]:
"""Take a screenshot of the current screen.
Returns:
Dict[str, Any]: A dictionary containing success status and base64-encoded image data,
or error message if failed.
"""
try:
from PIL import Image
screenshot = pyautogui.screenshot()
@@ -253,6 +447,12 @@ class LinuxAutomationHandler(BaseAutomationHandler):
return {"success": False, "error": f"Screenshot error: {str(e)}"}
async def get_screen_size(self) -> Dict[str, Any]:
"""Get the size of the screen.
Returns:
Dict[str, Any]: A dictionary containing success status and screen dimensions,
or error message if failed.
"""
try:
size = pyautogui.size()
return {"success": True, "size": {"width": size.width, "height": size.height}}
@@ -260,6 +460,12 @@ class LinuxAutomationHandler(BaseAutomationHandler):
return {"success": False, "error": str(e)}
async def get_cursor_position(self) -> Dict[str, Any]:
"""Get the current position of the cursor.
Returns:
Dict[str, Any]: A dictionary containing success status and cursor coordinates,
or error message if failed.
"""
try:
pos = pyautogui.position()
return {"success": True, "position": {"x": pos.x, "y": pos.y}}
@@ -268,6 +474,12 @@ class LinuxAutomationHandler(BaseAutomationHandler):
# Clipboard Actions
async def copy_to_clipboard(self) -> Dict[str, Any]:
"""Get the current content of the clipboard.
Returns:
Dict[str, Any]: A dictionary containing success status and clipboard content,
or error message if failed.
"""
try:
import pyperclip
content = pyperclip.paste()
@@ -276,6 +488,14 @@ class LinuxAutomationHandler(BaseAutomationHandler):
return {"success": False, "error": str(e)}
async def set_clipboard(self, text: str) -> Dict[str, Any]:
"""Set the clipboard content to the specified text.
Args:
text: The text to copy to the clipboard.
Returns:
Dict[str, Any]: A dictionary with success status and error message if failed.
"""
try:
import pyperclip
pyperclip.copy(text)
@@ -285,6 +505,15 @@ class LinuxAutomationHandler(BaseAutomationHandler):
# Command Execution
async def run_command(self, command: str) -> Dict[str, Any]:
"""Execute a shell command asynchronously.
Args:
command: The shell command to execute.
Returns:
Dict[str, Any]: A dictionary containing success status, stdout, stderr,
and return code, or error message if failed.
"""
try:
# Create subprocess
process = await asyncio.create_subprocess_shell(

View File

@@ -3,6 +3,12 @@ import re
from pydantic import BaseModel, Field, computed_field, validator, ConfigDict, RootModel
class DiskInfo(BaseModel):
"""Information about disk storage allocation.
Attributes:
total: Total disk space in bytes
allocated: Currently allocated disk space in bytes
"""
total: int
allocated: int
@@ -10,6 +16,15 @@ class VMConfig(BaseModel):
"""Configuration for creating a new VM.
Note: Memory and disk sizes should be specified with units (e.g., "4GB", "64GB")
Attributes:
name: Name of the virtual machine
os: Operating system type, either "macOS" or "linux"
cpu: Number of CPU cores to allocate
memory: Amount of memory to allocate with units
disk_size: Size of the disk to create with units
display: Display resolution in format "widthxheight"
ipsw: IPSW path or 'latest' for macOS VMs, None for other OS types
"""
name: str
os: Literal["macOS", "linux"] = "macOS"
@@ -23,7 +38,12 @@ class VMConfig(BaseModel):
populate_by_alias = True
class SharedDirectory(BaseModel):
"""Configuration for a shared directory."""
"""Configuration for a shared directory.
Attributes:
host_path: Path to the directory on the host system
read_only: Whether the directory should be mounted as read-only
"""
host_path: str = Field(..., alias="hostPath") # Allow host_path but serialize as hostPath
read_only: bool = False
@@ -50,6 +70,16 @@ class VMRunOpts(BaseModel):
)
def model_dump(self, **kwargs):
"""Export model data with proper field name conversion.
Converts shared directory fields to match API expectations when using aliases.
Args:
**kwargs: Keyword arguments passed to parent model_dump method
Returns:
dict: Model data with properly formatted field names
"""
data = super().model_dump(**kwargs)
# Convert shared directory fields to match API expectations
if self.shared_directories and "by_alias" in kwargs and kwargs["by_alias"]:
@@ -65,6 +95,18 @@ class VMRunOpts(BaseModel):
return data
class VMStatus(BaseModel):
"""Status information for a virtual machine.
Attributes:
name: Name of the virtual machine
status: Current status of the VM
os: Operating system type
cpu_count: Number of CPU cores allocated
memory_size: Amount of memory allocated in bytes
disk_size: Disk storage information
vnc_url: URL for VNC connection if available
ip_address: IP address of the VM if available
"""
name: str
status: str
os: Literal["macOS", "linux"]
@@ -80,38 +122,79 @@ class VMStatus(BaseModel):
@computed_field
@property
def state(self) -> str:
"""Get the current state of the VM.
Returns:
str: Current VM status
"""
return self.status
@computed_field
@property
def cpu(self) -> int:
"""Get the number of CPU cores.
Returns:
int: Number of CPU cores allocated to the VM
"""
return self.cpu_count
@computed_field
@property
def memory(self) -> str:
"""Get memory allocation in human-readable format.
Returns:
str: Memory size formatted as "{size}GB"
"""
# Convert bytes to GB
gb = self.memory_size / (1024 * 1024 * 1024)
return f"{int(gb)}GB"
class VMUpdateOpts(BaseModel):
"""Options for updating VM configuration.
Attributes:
cpu: Number of CPU cores to update to
memory: Amount of memory to update to with units
disk_size: Size of disk to update to with units
"""
cpu: Optional[int] = None
memory: Optional[str] = None
disk_size: Optional[str] = None
class ImageRef(BaseModel):
"""Reference to a VM image."""
"""Reference to a VM image.
Attributes:
image: Name of the image
tag: Tag version of the image
registry: Registry hostname where image is stored
organization: Organization or namespace in the registry
"""
image: str
tag: str = "latest"
registry: Optional[str] = "ghcr.io"
organization: Optional[str] = "trycua"
def model_dump(self, **kwargs):
"""Override model_dump to return just the image:tag format."""
"""Override model_dump to return just the image:tag format.
Args:
**kwargs: Keyword arguments (ignored)
Returns:
str: Image reference in "image:tag" format
"""
return f"{self.image}:{self.tag}"
class CloneSpec(BaseModel):
"""Specification for cloning a VM."""
"""Specification for cloning a VM.
Attributes:
name: Name of the source VM to clone
new_name: Name for the new cloned VM
"""
name: str
new_name: str = Field(alias="newName")
@@ -119,18 +202,44 @@ class CloneSpec(BaseModel):
populate_by_alias = True
class ImageInfo(BaseModel):
"""Model for individual image information."""
"""Model for individual image information.
Attributes:
imageId: Unique identifier for the image
"""
imageId: str
class ImageList(RootModel):
"""Response model for the images endpoint."""
"""Response model for the images endpoint.
A list-like container for ImageInfo objects that provides
iteration and indexing capabilities.
"""
root: List[ImageInfo]
def __iter__(self):
"""Iterate over the image list.
Returns:
Iterator over ImageInfo objects
"""
return iter(self.root)
def __getitem__(self, item):
"""Get an item from the image list by index.
Args:
item: Index or slice to retrieve
Returns:
ImageInfo or list of ImageInfo objects
"""
return self.root[item]
def __len__(self):
return len(self.root)
"""Get the number of images in the list.
Returns:
int: Number of images in the list
"""
return len(self.root)