mirror of
https://github.com/trycua/computer.git
synced 2026-01-18 11:30:17 -06:00
feat: added Computer.tracing for Recording Sessions
Signed-off-by: Jagjeevan Kashid <jagjeevandev97@gmail.com>
This commit is contained in:
@@ -2,6 +2,8 @@ from typing import Optional, List, Literal, Dict, Any, Union, TYPE_CHECKING, cas
|
||||
import asyncio
|
||||
from .models import Computer as ComputerConfig, Display
|
||||
from .interface.factory import InterfaceFactory
|
||||
from .tracing import ComputerTracing
|
||||
from .tracing_wrapper import TracingInterfaceWrapper
|
||||
import time
|
||||
from PIL import Image
|
||||
import io
|
||||
@@ -197,7 +199,12 @@ class Computer:
|
||||
|
||||
# Initialize with proper typing - None at first, will be set in run()
|
||||
self._interface = None
|
||||
self._original_interface = None # Keep reference to original interface
|
||||
self._tracing_wrapper = None # Tracing wrapper for interface
|
||||
self.use_host_computer_server = use_host_computer_server
|
||||
|
||||
# Initialize tracing
|
||||
self._tracing = ComputerTracing(self)
|
||||
|
||||
# Record initialization in telemetry (if enabled)
|
||||
if telemetry_enabled and is_telemetry_enabled():
|
||||
@@ -248,12 +255,14 @@ class Computer:
|
||||
# Create the interface with explicit type annotation
|
||||
from .interface.base import BaseComputerInterface
|
||||
|
||||
self._interface = cast(
|
||||
interface = cast(
|
||||
BaseComputerInterface,
|
||||
InterfaceFactory.create_interface_for_os(
|
||||
os=self.os_type, ip_address=ip_address # type: ignore[arg-type]
|
||||
),
|
||||
)
|
||||
self._interface = interface
|
||||
self._original_interface = interface
|
||||
|
||||
self.logger.info("Waiting for host computer server to be ready...")
|
||||
await self._interface.wait_for_ready()
|
||||
@@ -464,7 +473,7 @@ class Computer:
|
||||
|
||||
# Pass authentication credentials if using cloud provider
|
||||
if self.provider_type == VMProviderType.CLOUD and self.api_key and self.config.name:
|
||||
self._interface = cast(
|
||||
interface = cast(
|
||||
BaseComputerInterface,
|
||||
InterfaceFactory.create_interface_for_os(
|
||||
os=self.os_type,
|
||||
@@ -474,13 +483,16 @@ class Computer:
|
||||
),
|
||||
)
|
||||
else:
|
||||
self._interface = cast(
|
||||
interface = cast(
|
||||
BaseComputerInterface,
|
||||
InterfaceFactory.create_interface_for_os(
|
||||
os=self.os_type,
|
||||
ip_address=ip_address
|
||||
),
|
||||
)
|
||||
|
||||
self._interface = interface
|
||||
self._original_interface = interface
|
||||
|
||||
# Wait for the WebSocket interface to be ready
|
||||
self.logger.info("Connecting to WebSocket interface...")
|
||||
@@ -736,7 +748,7 @@ class Computer:
|
||||
"""Get the computer interface for interacting with the VM.
|
||||
|
||||
Returns:
|
||||
The computer interface
|
||||
The computer interface (wrapped with tracing if tracing is active)
|
||||
"""
|
||||
if not hasattr(self, "_interface") or self._interface is None:
|
||||
error_msg = "Computer interface not initialized. Call run() first."
|
||||
@@ -746,8 +758,28 @@ class Computer:
|
||||
)
|
||||
raise RuntimeError(error_msg)
|
||||
|
||||
# Return tracing wrapper if tracing is active and we have an original interface
|
||||
if (self._tracing.is_tracing and
|
||||
hasattr(self, "_original_interface") and
|
||||
self._original_interface is not None):
|
||||
# Create wrapper if it doesn't exist or if the original interface changed
|
||||
if (not hasattr(self, "_tracing_wrapper") or
|
||||
self._tracing_wrapper is None or
|
||||
self._tracing_wrapper._original_interface != self._original_interface):
|
||||
self._tracing_wrapper = TracingInterfaceWrapper(self._original_interface, self._tracing)
|
||||
return self._tracing_wrapper
|
||||
|
||||
return self._interface
|
||||
|
||||
@property
|
||||
def tracing(self) -> ComputerTracing:
|
||||
"""Get the computer tracing instance for recording sessions.
|
||||
|
||||
Returns:
|
||||
ComputerTracing: The tracing instance
|
||||
"""
|
||||
return self._tracing
|
||||
|
||||
@property
|
||||
def telemetry_enabled(self) -> bool:
|
||||
"""Check if telemetry is enabled for this computer instance.
|
||||
|
||||
325
libs/python/computer/computer/tracing.py
Normal file
325
libs/python/computer/computer/tracing.py
Normal file
@@ -0,0 +1,325 @@
|
||||
"""
|
||||
Computer tracing functionality for recording sessions.
|
||||
|
||||
This module provides a Computer.tracing API inspired by Playwright's tracing functionality,
|
||||
allowing users to record computer interactions for debugging, training, and analysis.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import time
|
||||
import uuid
|
||||
import zipfile
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional, Union
|
||||
from PIL import Image
|
||||
import io
|
||||
import base64
|
||||
|
||||
|
||||
class ComputerTracing:
|
||||
"""
|
||||
Computer tracing class that records computer interactions and saves them to disk.
|
||||
|
||||
This class provides a flexible API for recording computer sessions with configurable
|
||||
options for what to record (screenshots, API calls, video, etc.).
|
||||
"""
|
||||
|
||||
def __init__(self, computer_instance):
|
||||
"""
|
||||
Initialize the tracing instance.
|
||||
|
||||
Args:
|
||||
computer_instance: The Computer instance to trace
|
||||
"""
|
||||
self._computer = computer_instance
|
||||
self._is_tracing = False
|
||||
self._trace_config: Dict[str, Any] = {}
|
||||
self._trace_data: List[Dict[str, Any]] = []
|
||||
self._trace_start_time: Optional[float] = None
|
||||
self._trace_id: Optional[str] = None
|
||||
self._trace_dir: Optional[Path] = None
|
||||
self._screenshot_count = 0
|
||||
|
||||
@property
|
||||
def is_tracing(self) -> bool:
|
||||
"""Check if tracing is currently active."""
|
||||
return self._is_tracing
|
||||
|
||||
async def start(self, config: Optional[Dict[str, Any]] = None) -> None:
|
||||
"""
|
||||
Start tracing with the specified configuration.
|
||||
|
||||
Args:
|
||||
config: Tracing configuration dict with options:
|
||||
- video: bool - Record video frames (default: False)
|
||||
- screenshots: bool - Record screenshots (default: True)
|
||||
- api_calls: bool - Record API calls and results (default: True)
|
||||
- accessibility_tree: bool - Record accessibility tree snapshots (default: False)
|
||||
- metadata: bool - Record custom metadata (default: True)
|
||||
- name: str - Custom trace name (default: auto-generated)
|
||||
- path: str - Custom trace directory path (default: auto-generated)
|
||||
"""
|
||||
if self._is_tracing:
|
||||
raise RuntimeError("Tracing is already active. Call stop() first.")
|
||||
|
||||
# Set default configuration
|
||||
default_config = {
|
||||
'video': False,
|
||||
'screenshots': True,
|
||||
'api_calls': True,
|
||||
'accessibility_tree': False,
|
||||
'metadata': True,
|
||||
'name': None,
|
||||
'path': None
|
||||
}
|
||||
|
||||
self._trace_config = {**default_config, **(config or {})}
|
||||
|
||||
# Generate trace ID and directory
|
||||
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
|
||||
self._trace_id = self._trace_config.get('name') or f"trace_{timestamp}_{str(uuid.uuid4())[:8]}"
|
||||
|
||||
if self._trace_config.get('path'):
|
||||
self._trace_dir = Path(self._trace_config['path'])
|
||||
else:
|
||||
self._trace_dir = Path.cwd() / "traces" / self._trace_id
|
||||
|
||||
# Create trace directory
|
||||
self._trace_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Initialize trace data
|
||||
self._trace_data = []
|
||||
self._trace_start_time = time.time()
|
||||
self._screenshot_count = 0
|
||||
self._is_tracing = True
|
||||
|
||||
# Record initial metadata
|
||||
await self._record_event('trace_start', {
|
||||
'trace_id': self._trace_id,
|
||||
'config': self._trace_config,
|
||||
'timestamp': self._trace_start_time,
|
||||
'computer_info': {
|
||||
'os_type': self._computer.os_type,
|
||||
'provider_type': str(self._computer.provider_type),
|
||||
'image': self._computer.image
|
||||
}
|
||||
})
|
||||
|
||||
# Take initial screenshot if enabled
|
||||
if self._trace_config.get('screenshots'):
|
||||
await self._take_screenshot('initial_screenshot')
|
||||
|
||||
async def stop(self, options: Optional[Dict[str, Any]] = None) -> str:
|
||||
"""
|
||||
Stop tracing and save the trace data.
|
||||
|
||||
Args:
|
||||
options: Stop options dict with:
|
||||
- path: str - Custom output path for the trace archive
|
||||
- format: str - Output format ('zip' or 'dir', default: 'zip')
|
||||
|
||||
Returns:
|
||||
str: Path to the saved trace file or directory
|
||||
"""
|
||||
if not self._is_tracing:
|
||||
raise RuntimeError("Tracing is not active. Call start() first.")
|
||||
|
||||
if self._trace_start_time is None or self._trace_dir is None or self._trace_id is None:
|
||||
raise RuntimeError("Tracing state is invalid.")
|
||||
|
||||
# Record final metadata
|
||||
await self._record_event('trace_end', {
|
||||
'timestamp': time.time(),
|
||||
'duration': time.time() - self._trace_start_time,
|
||||
'total_events': len(self._trace_data),
|
||||
'screenshot_count': self._screenshot_count
|
||||
})
|
||||
|
||||
# Take final screenshot if enabled
|
||||
if self._trace_config.get('screenshots'):
|
||||
await self._take_screenshot('final_screenshot')
|
||||
|
||||
# Save trace metadata
|
||||
metadata_path = self._trace_dir / "trace_metadata.json"
|
||||
with open(metadata_path, 'w') as f:
|
||||
json.dump({
|
||||
'trace_id': self._trace_id,
|
||||
'config': self._trace_config,
|
||||
'start_time': self._trace_start_time,
|
||||
'end_time': time.time(),
|
||||
'duration': time.time() - self._trace_start_time,
|
||||
'total_events': len(self._trace_data),
|
||||
'screenshot_count': self._screenshot_count,
|
||||
'events': self._trace_data
|
||||
}, f, indent=2, default=str)
|
||||
|
||||
# Determine output format and path
|
||||
output_format = options.get('format', 'zip') if options else 'zip'
|
||||
custom_path = options.get('path') if options else None
|
||||
|
||||
if output_format == 'zip':
|
||||
# Create zip file
|
||||
if custom_path:
|
||||
zip_path = Path(custom_path)
|
||||
else:
|
||||
zip_path = self._trace_dir.parent / f"{self._trace_id}.zip"
|
||||
|
||||
await self._create_zip_archive(zip_path)
|
||||
output_path = str(zip_path)
|
||||
else:
|
||||
# Return directory path
|
||||
if custom_path:
|
||||
# Move directory to custom path
|
||||
custom_dir = Path(custom_path)
|
||||
if custom_dir.exists():
|
||||
import shutil
|
||||
shutil.rmtree(custom_dir)
|
||||
self._trace_dir.rename(custom_dir)
|
||||
output_path = str(custom_dir)
|
||||
else:
|
||||
output_path = str(self._trace_dir)
|
||||
|
||||
# Reset tracing state
|
||||
self._is_tracing = False
|
||||
self._trace_config = {}
|
||||
self._trace_data = []
|
||||
self._trace_start_time = None
|
||||
self._trace_id = None
|
||||
self._screenshot_count = 0
|
||||
|
||||
return output_path
|
||||
|
||||
async def _record_event(self, event_type: str, data: Dict[str, Any]) -> None:
|
||||
"""
|
||||
Record a trace event.
|
||||
|
||||
Args:
|
||||
event_type: Type of event (e.g., 'click', 'type', 'screenshot')
|
||||
data: Event data
|
||||
"""
|
||||
if not self._is_tracing or self._trace_start_time is None or self._trace_dir is None:
|
||||
return
|
||||
|
||||
event = {
|
||||
'type': event_type,
|
||||
'timestamp': time.time(),
|
||||
'relative_time': time.time() - self._trace_start_time,
|
||||
'data': data
|
||||
}
|
||||
|
||||
self._trace_data.append(event)
|
||||
|
||||
# Save event to individual file for large traces
|
||||
event_file = self._trace_dir / f"event_{len(self._trace_data):06d}_{event_type}.json"
|
||||
with open(event_file, 'w') as f:
|
||||
json.dump(event, f, indent=2, default=str)
|
||||
|
||||
async def _take_screenshot(self, name: str = 'screenshot') -> Optional[str]:
|
||||
"""
|
||||
Take a screenshot and save it to the trace.
|
||||
|
||||
Args:
|
||||
name: Name for the screenshot
|
||||
|
||||
Returns:
|
||||
Optional[str]: Path to the saved screenshot, or None if screenshots disabled
|
||||
"""
|
||||
if not self._trace_config.get('screenshots') or not self._computer.interface or self._trace_dir is None:
|
||||
return None
|
||||
|
||||
try:
|
||||
screenshot_bytes = await self._computer.interface.screenshot()
|
||||
self._screenshot_count += 1
|
||||
|
||||
screenshot_filename = f"{self._screenshot_count:06d}_{name}.png"
|
||||
screenshot_path = self._trace_dir / screenshot_filename
|
||||
|
||||
with open(screenshot_path, 'wb') as f:
|
||||
f.write(screenshot_bytes)
|
||||
|
||||
return str(screenshot_path)
|
||||
except Exception as e:
|
||||
# Log error but don't fail the trace
|
||||
if hasattr(self._computer, 'logger'):
|
||||
self._computer.logger.warning(f"Failed to take screenshot: {e}")
|
||||
return None
|
||||
|
||||
async def _create_zip_archive(self, zip_path: Path) -> None:
|
||||
"""
|
||||
Create a zip archive of the trace directory.
|
||||
|
||||
Args:
|
||||
zip_path: Path where to save the zip file
|
||||
"""
|
||||
if self._trace_dir is None:
|
||||
raise RuntimeError("Trace directory is not set")
|
||||
|
||||
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
|
||||
for file_path in self._trace_dir.rglob('*'):
|
||||
if file_path.is_file():
|
||||
arcname = file_path.relative_to(self._trace_dir)
|
||||
zipf.write(file_path, arcname)
|
||||
|
||||
async def record_api_call(self, method: str, args: Dict[str, Any], result: Any = None, error: Optional[Exception] = None) -> None:
|
||||
"""
|
||||
Record an API call event.
|
||||
|
||||
Args:
|
||||
method: The method name that was called
|
||||
args: Arguments passed to the method
|
||||
result: Result returned by the method
|
||||
error: Exception raised by the method, if any
|
||||
"""
|
||||
if not self._trace_config.get('api_calls'):
|
||||
return
|
||||
|
||||
# Take screenshot after certain actions if enabled
|
||||
screenshot_path = None
|
||||
screenshot_actions = ['left_click', 'right_click', 'double_click', 'type_text', 'press_key', 'hotkey']
|
||||
if method in screenshot_actions and self._trace_config.get('screenshots'):
|
||||
screenshot_path = await self._take_screenshot(f"after_{method}")
|
||||
|
||||
# Record accessibility tree after certain actions if enabled
|
||||
if method in screenshot_actions and self._trace_config.get('accessibility_tree'):
|
||||
await self.record_accessibility_tree()
|
||||
|
||||
await self._record_event('api_call', {
|
||||
'method': method,
|
||||
'args': args,
|
||||
'result': str(result) if result is not None else None,
|
||||
'error': str(error) if error else None,
|
||||
'screenshot': screenshot_path,
|
||||
'success': error is None
|
||||
})
|
||||
|
||||
async def record_accessibility_tree(self) -> None:
|
||||
"""Record the current accessibility tree if enabled."""
|
||||
if not self._trace_config.get('accessibility_tree') or not self._computer.interface:
|
||||
return
|
||||
|
||||
try:
|
||||
accessibility_tree = await self._computer.interface.get_accessibility_tree()
|
||||
await self._record_event('accessibility_tree', {
|
||||
'tree': accessibility_tree
|
||||
})
|
||||
except Exception as e:
|
||||
if hasattr(self._computer, 'logger'):
|
||||
self._computer.logger.warning(f"Failed to record accessibility tree: {e}")
|
||||
|
||||
async def add_metadata(self, key: str, value: Any) -> None:
|
||||
"""
|
||||
Add custom metadata to the trace.
|
||||
|
||||
Args:
|
||||
key: Metadata key
|
||||
value: Metadata value
|
||||
"""
|
||||
if not self._trace_config.get('metadata'):
|
||||
return
|
||||
|
||||
await self._record_event('metadata', {
|
||||
'key': key,
|
||||
'value': value
|
||||
})
|
||||
301
libs/python/computer/computer/tracing_wrapper.py
Normal file
301
libs/python/computer/computer/tracing_wrapper.py
Normal file
@@ -0,0 +1,301 @@
|
||||
"""
|
||||
Tracing wrapper for computer interface that records API calls.
|
||||
"""
|
||||
|
||||
from typing import Any, Dict, List, Optional, Tuple
|
||||
from .interface.base import BaseComputerInterface
|
||||
|
||||
|
||||
class TracingInterfaceWrapper:
|
||||
"""
|
||||
Wrapper class that intercepts computer interface calls and records them for tracing.
|
||||
"""
|
||||
|
||||
def __init__(self, original_interface: BaseComputerInterface, tracing_instance):
|
||||
"""
|
||||
Initialize the tracing wrapper.
|
||||
|
||||
Args:
|
||||
original_interface: The original computer interface
|
||||
tracing_instance: The ComputerTracing instance
|
||||
"""
|
||||
self._original_interface = original_interface
|
||||
self._tracing = tracing_instance
|
||||
|
||||
def __getattr__(self, name):
|
||||
"""
|
||||
Delegate attribute access to the original interface if not found in wrapper.
|
||||
"""
|
||||
return getattr(self._original_interface, name)
|
||||
|
||||
async def _record_call(self, method_name: str, args: Dict[str, Any], result: Any = None, error: Optional[Exception] = None):
|
||||
"""
|
||||
Record an API call for tracing.
|
||||
|
||||
Args:
|
||||
method_name: Name of the method called
|
||||
args: Arguments passed to the method
|
||||
result: Result returned by the method
|
||||
error: Exception raised, if any
|
||||
"""
|
||||
if self._tracing.is_tracing:
|
||||
await self._tracing.record_api_call(method_name, args, result, error)
|
||||
|
||||
# Mouse Actions
|
||||
async def left_click(self, x: Optional[int] = None, y: Optional[int] = None, delay: Optional[float] = None) -> None:
|
||||
"""Perform a left mouse button click."""
|
||||
args = {'x': x, 'y': y, 'delay': delay}
|
||||
error = None
|
||||
try:
|
||||
result = await self._original_interface.left_click(x, y, delay)
|
||||
return result
|
||||
except Exception as e:
|
||||
error = e
|
||||
raise
|
||||
finally:
|
||||
await self._record_call('left_click', args, None, error)
|
||||
|
||||
async def right_click(self, x: Optional[int] = None, y: Optional[int] = None, delay: Optional[float] = None) -> None:
|
||||
"""Perform a right mouse button click."""
|
||||
args = {'x': x, 'y': y, 'delay': delay}
|
||||
error = None
|
||||
try:
|
||||
result = await self._original_interface.right_click(x, y, delay)
|
||||
return result
|
||||
except Exception as e:
|
||||
error = e
|
||||
raise
|
||||
finally:
|
||||
await self._record_call('right_click', args, None, error)
|
||||
|
||||
async def double_click(self, x: Optional[int] = None, y: Optional[int] = None, delay: Optional[float] = None) -> None:
|
||||
"""Perform a double left mouse button click."""
|
||||
args = {'x': x, 'y': y, 'delay': delay}
|
||||
error = None
|
||||
try:
|
||||
result = await self._original_interface.double_click(x, y, delay)
|
||||
return result
|
||||
except Exception as e:
|
||||
error = e
|
||||
raise
|
||||
finally:
|
||||
await self._record_call('double_click', args, None, error)
|
||||
|
||||
async def move_cursor(self, x: int, y: int, delay: Optional[float] = None) -> None:
|
||||
"""Move the cursor to the specified screen coordinates."""
|
||||
args = {'x': x, 'y': y, 'delay': delay}
|
||||
error = None
|
||||
try:
|
||||
result = await self._original_interface.move_cursor(x, y, delay)
|
||||
return result
|
||||
except Exception as e:
|
||||
error = e
|
||||
raise
|
||||
finally:
|
||||
await self._record_call('move_cursor', args, None, error)
|
||||
|
||||
async def drag_to(self, x: int, y: int, button: str = "left", duration: float = 0.5, delay: Optional[float] = None) -> None:
|
||||
"""Drag from current position to specified coordinates."""
|
||||
args = {'x': x, 'y': y, 'button': button, 'duration': duration, 'delay': delay}
|
||||
error = None
|
||||
try:
|
||||
result = await self._original_interface.drag_to(x, y, button, duration, delay)
|
||||
return result
|
||||
except Exception as e:
|
||||
error = e
|
||||
raise
|
||||
finally:
|
||||
await self._record_call('drag_to', args, None, error)
|
||||
|
||||
async def drag(self, path: List[Tuple[int, int]], button: str = "left", duration: float = 0.5, delay: Optional[float] = None) -> None:
|
||||
"""Drag the cursor along a path of coordinates."""
|
||||
args = {'path': path, 'button': button, 'duration': duration, 'delay': delay}
|
||||
error = None
|
||||
try:
|
||||
result = await self._original_interface.drag(path, button, duration, delay)
|
||||
return result
|
||||
except Exception as e:
|
||||
error = e
|
||||
raise
|
||||
finally:
|
||||
await self._record_call('drag', args, None, error)
|
||||
|
||||
# Keyboard Actions
|
||||
async def key_down(self, key: str, delay: Optional[float] = None) -> None:
|
||||
"""Press and hold a key."""
|
||||
args = {'key': key, 'delay': delay}
|
||||
error = None
|
||||
try:
|
||||
result = await self._original_interface.key_down(key, delay)
|
||||
return result
|
||||
except Exception as e:
|
||||
error = e
|
||||
raise
|
||||
finally:
|
||||
await self._record_call('key_down', args, None, error)
|
||||
|
||||
async def key_up(self, key: str, delay: Optional[float] = None) -> None:
|
||||
"""Release a previously pressed key."""
|
||||
args = {'key': key, 'delay': delay}
|
||||
error = None
|
||||
try:
|
||||
result = await self._original_interface.key_up(key, delay)
|
||||
return result
|
||||
except Exception as e:
|
||||
error = e
|
||||
raise
|
||||
finally:
|
||||
await self._record_call('key_up', args, None, error)
|
||||
|
||||
async def type_text(self, text: str, delay: Optional[float] = None) -> None:
|
||||
"""Type the specified text string."""
|
||||
args = {'text': text, 'delay': delay}
|
||||
error = None
|
||||
try:
|
||||
result = await self._original_interface.type_text(text, delay)
|
||||
return result
|
||||
except Exception as e:
|
||||
error = e
|
||||
raise
|
||||
finally:
|
||||
await self._record_call('type_text', args, None, error)
|
||||
|
||||
async def press_key(self, key: str, delay: Optional[float] = None) -> None:
|
||||
"""Press and release a single key."""
|
||||
args = {'key': key, 'delay': delay}
|
||||
error = None
|
||||
try:
|
||||
result = await self._original_interface.press_key(key, delay)
|
||||
return result
|
||||
except Exception as e:
|
||||
error = e
|
||||
raise
|
||||
finally:
|
||||
await self._record_call('press_key', args, None, error)
|
||||
|
||||
async def hotkey(self, *keys: str, delay: Optional[float] = None) -> None:
|
||||
"""Press multiple keys simultaneously (keyboard shortcut)."""
|
||||
args = {'keys': keys, 'delay': delay}
|
||||
error = None
|
||||
try:
|
||||
result = await self._original_interface.hotkey(*keys, delay=delay)
|
||||
return result
|
||||
except Exception as e:
|
||||
error = e
|
||||
raise
|
||||
finally:
|
||||
await self._record_call('hotkey', args, None, error)
|
||||
|
||||
# Scrolling Actions
|
||||
async def scroll(self, x: int, y: int, delay: Optional[float] = None) -> None:
|
||||
"""Scroll the mouse wheel by specified amounts."""
|
||||
args = {'x': x, 'y': y, 'delay': delay}
|
||||
error = None
|
||||
try:
|
||||
result = await self._original_interface.scroll(x, y, delay)
|
||||
return result
|
||||
except Exception as e:
|
||||
error = e
|
||||
raise
|
||||
finally:
|
||||
await self._record_call('scroll', args, None, error)
|
||||
|
||||
async def scroll_down(self, clicks: int = 1, delay: Optional[float] = None) -> None:
|
||||
"""Scroll down by the specified number of clicks."""
|
||||
args = {'clicks': clicks, 'delay': delay}
|
||||
error = None
|
||||
try:
|
||||
result = await self._original_interface.scroll_down(clicks, delay)
|
||||
return result
|
||||
except Exception as e:
|
||||
error = e
|
||||
raise
|
||||
finally:
|
||||
await self._record_call('scroll_down', args, None, error)
|
||||
|
||||
async def scroll_up(self, clicks: int = 1, delay: Optional[float] = None) -> None:
|
||||
"""Scroll up by the specified number of clicks."""
|
||||
args = {'clicks': clicks, 'delay': delay}
|
||||
error = None
|
||||
try:
|
||||
result = await self._original_interface.scroll_up(clicks, delay)
|
||||
return result
|
||||
except Exception as e:
|
||||
error = e
|
||||
raise
|
||||
finally:
|
||||
await self._record_call('scroll_up', args, None, error)
|
||||
|
||||
# Screen Actions
|
||||
async def screenshot(self) -> bytes:
|
||||
"""Take a screenshot."""
|
||||
args = {}
|
||||
error = None
|
||||
result = None
|
||||
try:
|
||||
result = await self._original_interface.screenshot()
|
||||
return result
|
||||
except Exception as e:
|
||||
error = e
|
||||
raise
|
||||
finally:
|
||||
# For screenshots, we don't want to include the raw bytes in the trace args
|
||||
await self._record_call('screenshot', args, 'screenshot_taken' if result else None, error)
|
||||
|
||||
async def get_screen_size(self) -> Dict[str, int]:
|
||||
"""Get the screen dimensions."""
|
||||
args = {}
|
||||
error = None
|
||||
result = None
|
||||
try:
|
||||
result = await self._original_interface.get_screen_size()
|
||||
return result
|
||||
except Exception as e:
|
||||
error = e
|
||||
raise
|
||||
finally:
|
||||
await self._record_call('get_screen_size', args, result, error)
|
||||
|
||||
async def get_cursor_position(self) -> Dict[str, int]:
|
||||
"""Get the current cursor position on screen."""
|
||||
args = {}
|
||||
error = None
|
||||
result = None
|
||||
try:
|
||||
result = await self._original_interface.get_cursor_position()
|
||||
return result
|
||||
except Exception as e:
|
||||
error = e
|
||||
raise
|
||||
finally:
|
||||
await self._record_call('get_cursor_position', args, result, error)
|
||||
|
||||
# Clipboard Actions
|
||||
async def copy_to_clipboard(self) -> str:
|
||||
"""Get the current clipboard content."""
|
||||
args = {}
|
||||
error = None
|
||||
result = None
|
||||
try:
|
||||
result = await self._original_interface.copy_to_clipboard()
|
||||
return result
|
||||
except Exception as e:
|
||||
error = e
|
||||
raise
|
||||
finally:
|
||||
# Don't include clipboard content in trace for privacy
|
||||
await self._record_call('copy_to_clipboard', args, f'content_length_{len(result)}' if result else None, error)
|
||||
|
||||
async def set_clipboard(self, text: str) -> None:
|
||||
"""Set the clipboard content to the specified text."""
|
||||
# Don't include clipboard content in trace for privacy
|
||||
args = {'text_length': len(text)}
|
||||
error = None
|
||||
try:
|
||||
result = await self._original_interface.set_clipboard(text)
|
||||
return result
|
||||
except Exception as e:
|
||||
error = e
|
||||
raise
|
||||
finally:
|
||||
await self._record_call('set_clipboard', args, None, error)
|
||||
Reference in New Issue
Block a user