Format #423 with uv run pre-commit run --all-files

This commit is contained in:
James Murdza
2025-10-31 10:14:18 -07:00
parent 870bf1263c
commit 097f6f92af
7 changed files with 537 additions and 495 deletions

View File

@@ -1,10 +1,4 @@
import asyncio
from .models import Computer as ComputerConfig, Display
from .interface.factory import InterfaceFactory
from .tracing import ComputerTracing
from .tracing_wrapper import TracingInterfaceWrapper
import time
from PIL import Image
import io
import json
import logging
@@ -23,6 +17,8 @@ from .interface.factory import InterfaceFactory
from .logger import Logger, LogLevel
from .models import Computer as ComputerConfig
from .models import Display
from .tracing import ComputerTracing
from .tracing_wrapper import TracingInterfaceWrapper
SYSTEM_INFO = {
"os": platform.system().lower(),
@@ -217,7 +213,7 @@ class Computer:
self._original_interface = None # Keep reference to original interface
self._tracing_wrapper = None # Tracing wrapper for interface
self.use_host_computer_server = use_host_computer_server
# Initialize tracing
self._tracing = ComputerTracing(self)
@@ -522,7 +518,7 @@ class Computer:
os=self.os_type, ip_address=ip_address
),
)
self._interface = interface
self._original_interface = interface
@@ -893,16 +889,22 @@ class Computer:
raise RuntimeError(error_msg)
# Return tracing wrapper if tracing is active and we have an original interface
if (self._tracing.is_tracing and
hasattr(self, "_original_interface") and
self._original_interface is not None):
if (
self._tracing.is_tracing
and hasattr(self, "_original_interface")
and self._original_interface is not None
):
# Create wrapper if it doesn't exist or if the original interface changed
if (not hasattr(self, "_tracing_wrapper") or
self._tracing_wrapper is None or
self._tracing_wrapper._original_interface != self._original_interface):
self._tracing_wrapper = TracingInterfaceWrapper(self._original_interface, self._tracing)
if (
not hasattr(self, "_tracing_wrapper")
or self._tracing_wrapper is None
or self._tracing_wrapper._original_interface != self._original_interface
):
self._tracing_wrapper = TracingInterfaceWrapper(
self._original_interface, self._tracing
)
return self._tracing_wrapper
return self._interface
@property

View File

@@ -6,6 +6,8 @@ allowing users to record computer interactions for debugging, training, and anal
"""
import asyncio
import base64
import io
import json
import time
import uuid
@@ -13,23 +15,22 @@ import zipfile
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional, Union
from PIL import Image
import io
import base64
class ComputerTracing:
"""
Computer tracing class that records computer interactions and saves them to disk.
This class provides a flexible API for recording computer sessions with configurable
options for what to record (screenshots, API calls, video, etc.).
"""
def __init__(self, computer_instance):
"""
Initialize the tracing instance.
Args:
computer_instance: The Computer instance to trace
"""
@@ -41,16 +42,16 @@ class ComputerTracing:
self._trace_id: Optional[str] = None
self._trace_dir: Optional[Path] = None
self._screenshot_count = 0
@property
def is_tracing(self) -> bool:
"""Check if tracing is currently active."""
return self._is_tracing
async def start(self, config: Optional[Dict[str, Any]] = None) -> None:
"""
Start tracing with the specified configuration.
Args:
config: Tracing configuration dict with options:
- video: bool - Record video frames (default: False)
@@ -63,109 +64,122 @@ class ComputerTracing:
"""
if self._is_tracing:
raise RuntimeError("Tracing is already active. Call stop() first.")
# Set default configuration
default_config = {
'video': False,
'screenshots': True,
'api_calls': True,
'accessibility_tree': False,
'metadata': True,
'name': None,
'path': None
"video": False,
"screenshots": True,
"api_calls": True,
"accessibility_tree": False,
"metadata": True,
"name": None,
"path": None,
}
self._trace_config = {**default_config, **(config or {})}
# Generate trace ID and directory
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
self._trace_id = self._trace_config.get('name') or f"trace_{timestamp}_{str(uuid.uuid4())[:8]}"
if self._trace_config.get('path'):
self._trace_dir = Path(self._trace_config['path'])
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
self._trace_id = (
self._trace_config.get("name") or f"trace_{timestamp}_{str(uuid.uuid4())[:8]}"
)
if self._trace_config.get("path"):
self._trace_dir = Path(self._trace_config["path"])
else:
self._trace_dir = Path.cwd() / "traces" / self._trace_id
# Create trace directory
self._trace_dir.mkdir(parents=True, exist_ok=True)
# Initialize trace data
self._trace_data = []
self._trace_start_time = time.time()
self._screenshot_count = 0
self._is_tracing = True
# Record initial metadata
await self._record_event('trace_start', {
'trace_id': self._trace_id,
'config': self._trace_config,
'timestamp': self._trace_start_time,
'computer_info': {
'os_type': self._computer.os_type,
'provider_type': str(self._computer.provider_type),
'image': self._computer.image
}
})
await self._record_event(
"trace_start",
{
"trace_id": self._trace_id,
"config": self._trace_config,
"timestamp": self._trace_start_time,
"computer_info": {
"os_type": self._computer.os_type,
"provider_type": str(self._computer.provider_type),
"image": self._computer.image,
},
},
)
# Take initial screenshot if enabled
if self._trace_config.get('screenshots'):
await self._take_screenshot('initial_screenshot')
if self._trace_config.get("screenshots"):
await self._take_screenshot("initial_screenshot")
async def stop(self, options: Optional[Dict[str, Any]] = None) -> str:
"""
Stop tracing and save the trace data.
Args:
options: Stop options dict with:
- path: str - Custom output path for the trace archive
- format: str - Output format ('zip' or 'dir', default: 'zip')
Returns:
str: Path to the saved trace file or directory
"""
if not self._is_tracing:
raise RuntimeError("Tracing is not active. Call start() first.")
if self._trace_start_time is None or self._trace_dir is None or self._trace_id is None:
raise RuntimeError("Tracing state is invalid.")
# Record final metadata
await self._record_event('trace_end', {
'timestamp': time.time(),
'duration': time.time() - self._trace_start_time,
'total_events': len(self._trace_data),
'screenshot_count': self._screenshot_count
})
await self._record_event(
"trace_end",
{
"timestamp": time.time(),
"duration": time.time() - self._trace_start_time,
"total_events": len(self._trace_data),
"screenshot_count": self._screenshot_count,
},
)
# Take final screenshot if enabled
if self._trace_config.get('screenshots'):
await self._take_screenshot('final_screenshot')
if self._trace_config.get("screenshots"):
await self._take_screenshot("final_screenshot")
# Save trace metadata
metadata_path = self._trace_dir / "trace_metadata.json"
with open(metadata_path, 'w') as f:
json.dump({
'trace_id': self._trace_id,
'config': self._trace_config,
'start_time': self._trace_start_time,
'end_time': time.time(),
'duration': time.time() - self._trace_start_time,
'total_events': len(self._trace_data),
'screenshot_count': self._screenshot_count,
'events': self._trace_data
}, f, indent=2, default=str)
with open(metadata_path, "w") as f:
json.dump(
{
"trace_id": self._trace_id,
"config": self._trace_config,
"start_time": self._trace_start_time,
"end_time": time.time(),
"duration": time.time() - self._trace_start_time,
"total_events": len(self._trace_data),
"screenshot_count": self._screenshot_count,
"events": self._trace_data,
},
f,
indent=2,
default=str,
)
# Determine output format and path
output_format = options.get('format', 'zip') if options else 'zip'
custom_path = options.get('path') if options else None
if output_format == 'zip':
output_format = options.get("format", "zip") if options else "zip"
custom_path = options.get("path") if options else None
if output_format == "zip":
# Create zip file
if custom_path:
zip_path = Path(custom_path)
else:
zip_path = self._trace_dir.parent / f"{self._trace_id}.zip"
await self._create_zip_archive(zip_path)
output_path = str(zip_path)
else:
@@ -175,12 +189,13 @@ class ComputerTracing:
custom_dir = Path(custom_path)
if custom_dir.exists():
import shutil
shutil.rmtree(custom_dir)
self._trace_dir.rename(custom_dir)
output_path = str(custom_dir)
else:
output_path = str(self._trace_dir)
# Reset tracing state
self._is_tracing = False
self._trace_config = {}
@@ -188,138 +203,153 @@ class ComputerTracing:
self._trace_start_time = None
self._trace_id = None
self._screenshot_count = 0
return output_path
async def _record_event(self, event_type: str, data: Dict[str, Any]) -> None:
"""
Record a trace event.
Args:
event_type: Type of event (e.g., 'click', 'type', 'screenshot')
data: Event data
"""
if not self._is_tracing or self._trace_start_time is None or self._trace_dir is None:
return
event = {
'type': event_type,
'timestamp': time.time(),
'relative_time': time.time() - self._trace_start_time,
'data': data
"type": event_type,
"timestamp": time.time(),
"relative_time": time.time() - self._trace_start_time,
"data": data,
}
self._trace_data.append(event)
# Save event to individual file for large traces
event_file = self._trace_dir / f"event_{len(self._trace_data):06d}_{event_type}.json"
with open(event_file, 'w') as f:
with open(event_file, "w") as f:
json.dump(event, f, indent=2, default=str)
async def _take_screenshot(self, name: str = 'screenshot') -> Optional[str]:
async def _take_screenshot(self, name: str = "screenshot") -> Optional[str]:
"""
Take a screenshot and save it to the trace.
Args:
name: Name for the screenshot
Returns:
Optional[str]: Path to the saved screenshot, or None if screenshots disabled
"""
if not self._trace_config.get('screenshots') or not self._computer.interface or self._trace_dir is None:
if (
not self._trace_config.get("screenshots")
or not self._computer.interface
or self._trace_dir is None
):
return None
try:
screenshot_bytes = await self._computer.interface.screenshot()
self._screenshot_count += 1
screenshot_filename = f"{self._screenshot_count:06d}_{name}.png"
screenshot_path = self._trace_dir / screenshot_filename
with open(screenshot_path, 'wb') as f:
with open(screenshot_path, "wb") as f:
f.write(screenshot_bytes)
return str(screenshot_path)
except Exception as e:
# Log error but don't fail the trace
if hasattr(self._computer, 'logger'):
if hasattr(self._computer, "logger"):
self._computer.logger.warning(f"Failed to take screenshot: {e}")
return None
async def _create_zip_archive(self, zip_path: Path) -> None:
"""
Create a zip archive of the trace directory.
Args:
zip_path: Path where to save the zip file
"""
if self._trace_dir is None:
raise RuntimeError("Trace directory is not set")
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
for file_path in self._trace_dir.rglob('*'):
with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zipf:
for file_path in self._trace_dir.rglob("*"):
if file_path.is_file():
arcname = file_path.relative_to(self._trace_dir)
zipf.write(file_path, arcname)
async def record_api_call(self, method: str, args: Dict[str, Any], result: Any = None, error: Optional[Exception] = None) -> None:
async def record_api_call(
self,
method: str,
args: Dict[str, Any],
result: Any = None,
error: Optional[Exception] = None,
) -> None:
"""
Record an API call event.
Args:
method: The method name that was called
args: Arguments passed to the method
result: Result returned by the method
error: Exception raised by the method, if any
"""
if not self._trace_config.get('api_calls'):
if not self._trace_config.get("api_calls"):
return
# Take screenshot after certain actions if enabled
screenshot_path = None
screenshot_actions = ['left_click', 'right_click', 'double_click', 'type_text', 'press_key', 'hotkey']
if method in screenshot_actions and self._trace_config.get('screenshots'):
screenshot_actions = [
"left_click",
"right_click",
"double_click",
"type_text",
"press_key",
"hotkey",
]
if method in screenshot_actions and self._trace_config.get("screenshots"):
screenshot_path = await self._take_screenshot(f"after_{method}")
# Record accessibility tree after certain actions if enabled
if method in screenshot_actions and self._trace_config.get('accessibility_tree'):
if method in screenshot_actions and self._trace_config.get("accessibility_tree"):
await self.record_accessibility_tree()
await self._record_event('api_call', {
'method': method,
'args': args,
'result': str(result) if result is not None else None,
'error': str(error) if error else None,
'screenshot': screenshot_path,
'success': error is None
})
await self._record_event(
"api_call",
{
"method": method,
"args": args,
"result": str(result) if result is not None else None,
"error": str(error) if error else None,
"screenshot": screenshot_path,
"success": error is None,
},
)
async def record_accessibility_tree(self) -> None:
"""Record the current accessibility tree if enabled."""
if not self._trace_config.get('accessibility_tree') or not self._computer.interface:
if not self._trace_config.get("accessibility_tree") or not self._computer.interface:
return
try:
accessibility_tree = await self._computer.interface.get_accessibility_tree()
await self._record_event('accessibility_tree', {
'tree': accessibility_tree
})
await self._record_event("accessibility_tree", {"tree": accessibility_tree})
except Exception as e:
if hasattr(self._computer, 'logger'):
if hasattr(self._computer, "logger"):
self._computer.logger.warning(f"Failed to record accessibility tree: {e}")
async def add_metadata(self, key: str, value: Any) -> None:
"""
Add custom metadata to the trace.
Args:
key: Metadata key
value: Metadata value
"""
if not self._trace_config.get('metadata'):
if not self._trace_config.get("metadata"):
return
await self._record_event('metadata', {
'key': key,
'value': value
})
await self._record_event("metadata", {"key": key, "value": value})

View File

@@ -3,6 +3,7 @@ Tracing wrapper for computer interface that records API calls.
"""
from typing import Any, Dict, List, Optional, Tuple
from .interface.base import BaseComputerInterface
@@ -10,28 +11,34 @@ class TracingInterfaceWrapper:
"""
Wrapper class that intercepts computer interface calls and records them for tracing.
"""
def __init__(self, original_interface: BaseComputerInterface, tracing_instance):
"""
Initialize the tracing wrapper.
Args:
original_interface: The original computer interface
tracing_instance: The ComputerTracing instance
"""
self._original_interface = original_interface
self._tracing = tracing_instance
def __getattr__(self, name):
"""
Delegate attribute access to the original interface if not found in wrapper.
"""
return getattr(self._original_interface, name)
async def _record_call(self, method_name: str, args: Dict[str, Any], result: Any = None, error: Optional[Exception] = None):
async def _record_call(
self,
method_name: str,
args: Dict[str, Any],
result: Any = None,
error: Optional[Exception] = None,
):
"""
Record an API call for tracing.
Args:
method_name: Name of the method called
args: Arguments passed to the method
@@ -40,11 +47,13 @@ class TracingInterfaceWrapper:
"""
if self._tracing.is_tracing:
await self._tracing.record_api_call(method_name, args, result, error)
# Mouse Actions
async def left_click(self, x: Optional[int] = None, y: Optional[int] = None, delay: Optional[float] = None) -> None:
async def left_click(
self, x: Optional[int] = None, y: Optional[int] = None, delay: Optional[float] = None
) -> None:
"""Perform a left mouse button click."""
args = {'x': x, 'y': y, 'delay': delay}
args = {"x": x, "y": y, "delay": delay}
error = None
try:
result = await self._original_interface.left_click(x, y, delay)
@@ -53,11 +62,13 @@ class TracingInterfaceWrapper:
error = e
raise
finally:
await self._record_call('left_click', args, None, error)
await self._record_call("left_click", args, None, error)
async def right_click(self, x: Optional[int] = None, y: Optional[int] = None, delay: Optional[float] = None) -> None:
async def right_click(
self, x: Optional[int] = None, y: Optional[int] = None, delay: Optional[float] = None
) -> None:
"""Perform a right mouse button click."""
args = {'x': x, 'y': y, 'delay': delay}
args = {"x": x, "y": y, "delay": delay}
error = None
try:
result = await self._original_interface.right_click(x, y, delay)
@@ -66,11 +77,13 @@ class TracingInterfaceWrapper:
error = e
raise
finally:
await self._record_call('right_click', args, None, error)
await self._record_call("right_click", args, None, error)
async def double_click(self, x: Optional[int] = None, y: Optional[int] = None, delay: Optional[float] = None) -> None:
async def double_click(
self, x: Optional[int] = None, y: Optional[int] = None, delay: Optional[float] = None
) -> None:
"""Perform a double left mouse button click."""
args = {'x': x, 'y': y, 'delay': delay}
args = {"x": x, "y": y, "delay": delay}
error = None
try:
result = await self._original_interface.double_click(x, y, delay)
@@ -79,11 +92,11 @@ class TracingInterfaceWrapper:
error = e
raise
finally:
await self._record_call('double_click', args, None, error)
await self._record_call("double_click", args, None, error)
async def move_cursor(self, x: int, y: int, delay: Optional[float] = None) -> None:
"""Move the cursor to the specified screen coordinates."""
args = {'x': x, 'y': y, 'delay': delay}
args = {"x": x, "y": y, "delay": delay}
error = None
try:
result = await self._original_interface.move_cursor(x, y, delay)
@@ -92,11 +105,18 @@ class TracingInterfaceWrapper:
error = e
raise
finally:
await self._record_call('move_cursor', args, None, error)
await self._record_call("move_cursor", args, None, error)
async def drag_to(self, x: int, y: int, button: str = "left", duration: float = 0.5, delay: Optional[float] = None) -> None:
async def drag_to(
self,
x: int,
y: int,
button: str = "left",
duration: float = 0.5,
delay: Optional[float] = None,
) -> None:
"""Drag from current position to specified coordinates."""
args = {'x': x, 'y': y, 'button': button, 'duration': duration, 'delay': delay}
args = {"x": x, "y": y, "button": button, "duration": duration, "delay": delay}
error = None
try:
result = await self._original_interface.drag_to(x, y, button, duration, delay)
@@ -105,11 +125,17 @@ class TracingInterfaceWrapper:
error = e
raise
finally:
await self._record_call('drag_to', args, None, error)
await self._record_call("drag_to", args, None, error)
async def drag(self, path: List[Tuple[int, int]], button: str = "left", duration: float = 0.5, delay: Optional[float] = None) -> None:
async def drag(
self,
path: List[Tuple[int, int]],
button: str = "left",
duration: float = 0.5,
delay: Optional[float] = None,
) -> None:
"""Drag the cursor along a path of coordinates."""
args = {'path': path, 'button': button, 'duration': duration, 'delay': delay}
args = {"path": path, "button": button, "duration": duration, "delay": delay}
error = None
try:
result = await self._original_interface.drag(path, button, duration, delay)
@@ -118,12 +144,12 @@ class TracingInterfaceWrapper:
error = e
raise
finally:
await self._record_call('drag', args, None, error)
await self._record_call("drag", args, None, error)
# Keyboard Actions
async def key_down(self, key: str, delay: Optional[float] = None) -> None:
"""Press and hold a key."""
args = {'key': key, 'delay': delay}
args = {"key": key, "delay": delay}
error = None
try:
result = await self._original_interface.key_down(key, delay)
@@ -132,11 +158,11 @@ class TracingInterfaceWrapper:
error = e
raise
finally:
await self._record_call('key_down', args, None, error)
await self._record_call("key_down", args, None, error)
async def key_up(self, key: str, delay: Optional[float] = None) -> None:
"""Release a previously pressed key."""
args = {'key': key, 'delay': delay}
args = {"key": key, "delay": delay}
error = None
try:
result = await self._original_interface.key_up(key, delay)
@@ -145,11 +171,11 @@ class TracingInterfaceWrapper:
error = e
raise
finally:
await self._record_call('key_up', args, None, error)
await self._record_call("key_up", args, None, error)
async def type_text(self, text: str, delay: Optional[float] = None) -> None:
"""Type the specified text string."""
args = {'text': text, 'delay': delay}
args = {"text": text, "delay": delay}
error = None
try:
result = await self._original_interface.type_text(text, delay)
@@ -158,11 +184,11 @@ class TracingInterfaceWrapper:
error = e
raise
finally:
await self._record_call('type_text', args, None, error)
await self._record_call("type_text", args, None, error)
async def press_key(self, key: str, delay: Optional[float] = None) -> None:
"""Press and release a single key."""
args = {'key': key, 'delay': delay}
args = {"key": key, "delay": delay}
error = None
try:
result = await self._original_interface.press_key(key, delay)
@@ -171,11 +197,11 @@ class TracingInterfaceWrapper:
error = e
raise
finally:
await self._record_call('press_key', args, None, error)
await self._record_call("press_key", args, None, error)
async def hotkey(self, *keys: str, delay: Optional[float] = None) -> None:
"""Press multiple keys simultaneously (keyboard shortcut)."""
args = {'keys': keys, 'delay': delay}
args = {"keys": keys, "delay": delay}
error = None
try:
result = await self._original_interface.hotkey(*keys, delay=delay)
@@ -184,12 +210,12 @@ class TracingInterfaceWrapper:
error = e
raise
finally:
await self._record_call('hotkey', args, None, error)
await self._record_call("hotkey", args, None, error)
# Scrolling Actions
async def scroll(self, x: int, y: int, delay: Optional[float] = None) -> None:
"""Scroll the mouse wheel by specified amounts."""
args = {'x': x, 'y': y, 'delay': delay}
args = {"x": x, "y": y, "delay": delay}
error = None
try:
result = await self._original_interface.scroll(x, y, delay)
@@ -198,11 +224,11 @@ class TracingInterfaceWrapper:
error = e
raise
finally:
await self._record_call('scroll', args, None, error)
await self._record_call("scroll", args, None, error)
async def scroll_down(self, clicks: int = 1, delay: Optional[float] = None) -> None:
"""Scroll down by the specified number of clicks."""
args = {'clicks': clicks, 'delay': delay}
args = {"clicks": clicks, "delay": delay}
error = None
try:
result = await self._original_interface.scroll_down(clicks, delay)
@@ -211,11 +237,11 @@ class TracingInterfaceWrapper:
error = e
raise
finally:
await self._record_call('scroll_down', args, None, error)
await self._record_call("scroll_down", args, None, error)
async def scroll_up(self, clicks: int = 1, delay: Optional[float] = None) -> None:
"""Scroll up by the specified number of clicks."""
args = {'clicks': clicks, 'delay': delay}
args = {"clicks": clicks, "delay": delay}
error = None
try:
result = await self._original_interface.scroll_up(clicks, delay)
@@ -224,7 +250,7 @@ class TracingInterfaceWrapper:
error = e
raise
finally:
await self._record_call('scroll_up', args, None, error)
await self._record_call("scroll_up", args, None, error)
# Screen Actions
async def screenshot(self) -> bytes:
@@ -240,7 +266,9 @@ class TracingInterfaceWrapper:
raise
finally:
# For screenshots, we don't want to include the raw bytes in the trace args
await self._record_call('screenshot', args, 'screenshot_taken' if result else None, error)
await self._record_call(
"screenshot", args, "screenshot_taken" if result else None, error
)
async def get_screen_size(self) -> Dict[str, int]:
"""Get the screen dimensions."""
@@ -254,7 +282,7 @@ class TracingInterfaceWrapper:
error = e
raise
finally:
await self._record_call('get_screen_size', args, result, error)
await self._record_call("get_screen_size", args, result, error)
async def get_cursor_position(self) -> Dict[str, int]:
"""Get the current cursor position on screen."""
@@ -268,7 +296,7 @@ class TracingInterfaceWrapper:
error = e
raise
finally:
await self._record_call('get_cursor_position', args, result, error)
await self._record_call("get_cursor_position", args, result, error)
# Clipboard Actions
async def copy_to_clipboard(self) -> str:
@@ -284,12 +312,17 @@ class TracingInterfaceWrapper:
raise
finally:
# Don't include clipboard content in trace for privacy
await self._record_call('copy_to_clipboard', args, f'content_length_{len(result)}' if result else None, error)
await self._record_call(
"copy_to_clipboard",
args,
f"content_length_{len(result)}" if result else None,
error,
)
async def set_clipboard(self, text: str) -> None:
"""Set the clipboard content to the specified text."""
# Don't include clipboard content in trace for privacy
args = {'text_length': len(text)}
args = {"text_length": len(text)}
error = None
try:
result = await self._original_interface.set_clipboard(text)
@@ -298,4 +331,4 @@ class TracingInterfaceWrapper:
error = e
raise
finally:
await self._record_call('set_clipboard', args, None, error)
await self._record_call("set_clipboard", args, None, error)