From 73cd489ac355c9488fb72cf5912bcaf51bc7358b Mon Sep 17 00:00:00 2001 From: Dillon DuPont Date: Fri, 8 Aug 2025 10:53:26 -0400 Subject: [PATCH] Added dict-based custom computers --- libs/python/agent/agent/agent.py | 9 +- libs/python/agent/agent/computers/__init__.py | 21 +- libs/python/agent/agent/computers/cua.py | 41 ++-- libs/python/agent/agent/computers/custom.py | 185 ++++++++++++++++++ libs/python/agent/agent/types.py | 2 +- 5 files changed, 227 insertions(+), 31 deletions(-) create mode 100644 libs/python/agent/agent/computers/custom.py diff --git a/libs/python/agent/agent/agent.py b/libs/python/agent/agent/agent.py index 48bd7b54..536fb341 100644 --- a/libs/python/agent/agent/agent.py +++ b/libs/python/agent/agent/agent.py @@ -23,6 +23,7 @@ from .callbacks import ( ) from .computers import ( ComputerHandler, + is_agent_computer, make_computer_handler ) @@ -239,10 +240,6 @@ class ComputerAgent: async def _initialize_computers(self): """Initialize computer objects""" if not self.tool_schemas: - for tool in self.tools: - if hasattr(tool, '_initialized') and not tool._initialized: - await tool.run() - # Process tools and create tool schemas self.tool_schemas = self._process_tools() @@ -250,7 +247,7 @@ class ComputerAgent: computer_handler = None for schema in self.tool_schemas: if schema["type"] == "computer": - computer_handler = make_computer_handler(schema["computer"]) + computer_handler = await make_computer_handler(schema["computer"]) break self.computer_handler = computer_handler @@ -266,7 +263,7 @@ class ComputerAgent: for tool in self.tools: # Check if it's a computer object (has interface attribute) - if hasattr(tool, 'interface'): + if is_agent_computer(tool): # This is a computer tool - will be handled by agent loop schemas.append({ "type": "computer", diff --git a/libs/python/agent/agent/computers/__init__.py b/libs/python/agent/agent/computers/__init__.py index e2c4a07a..9b60308a 100644 --- a/libs/python/agent/agent/computers/__init__.py +++ b/libs/python/agent/agent/computers/__init__.py @@ -8,14 +8,21 @@ Computer library interface. from .base import ComputerHandler from .cua import cuaComputerHandler -from computer import Computer +from .custom import CustomComputerHandler +from computer import Computer as cuaComputer -def make_computer_handler(computer): +def is_agent_computer(computer): + """Check if the given computer is a ComputerHandler or CUA Computer.""" + return isinstance(computer, ComputerHandler) or \ + isinstance(computer, cuaComputer) or \ + (isinstance(computer, dict)) #and "screenshot" in computer) + +async def make_computer_handler(computer): """ Create a computer handler from a computer interface. Args: - computer: Either a ComputerHandler instance or a Computer instance + computer: Either a ComputerHandler instance, Computer instance, or dict of functions Returns: ComputerHandler: A computer handler instance @@ -25,6 +32,10 @@ def make_computer_handler(computer): """ if isinstance(computer, ComputerHandler): return computer - if isinstance(computer, Computer): - return cuaComputerHandler(computer) + if isinstance(computer, cuaComputer): + computer_handler = cuaComputerHandler(computer) + await computer_handler._initialize() + return computer_handler + if isinstance(computer, dict): + return CustomComputerHandler(computer) raise ValueError(f"Unsupported computer type: {type(computer)}") \ No newline at end of file diff --git a/libs/python/agent/agent/computers/cua.py b/libs/python/agent/agent/computers/cua.py index 30663116..34a984df 100644 --- a/libs/python/agent/agent/computers/cua.py +++ b/libs/python/agent/agent/computers/cua.py @@ -12,27 +12,36 @@ class cuaComputerHandler(ComputerHandler): def __init__(self, cua_computer: Computer): """Initialize with a computer interface (from tool schema).""" - self.interface = cua_computer.interface + self.cua_computer = cua_computer + self.interface = None + + async def _initialize(self): + if hasattr(self.cua_computer, '_initialized') and not self.cua_computer._initialized: + await self.cua_computer.run() + self.interface = self.cua_computer.interface # ==== Computer-Use-Preview Action Space ==== async def get_environment(self) -> Literal["windows", "mac", "linux", "browser"]: """Get the current environment type.""" - # For now, return a default - this could be enhanced to detect actual environment - return "windows" + # TODO: detect actual environment + return "linux" async def get_dimensions(self) -> tuple[int, int]: """Get screen dimensions as (width, height).""" + assert self.interface is not None screen_size = await self.interface.get_screen_size() return screen_size["width"], screen_size["height"] async def screenshot(self) -> str: """Take a screenshot and return as base64 string.""" + assert self.interface is not None screenshot_bytes = await self.interface.screenshot() return base64.b64encode(screenshot_bytes).decode('utf-8') async def click(self, x: int, y: int, button: str = "left") -> None: """Click at coordinates with specified button.""" + assert self.interface is not None if button == "left": await self.interface.left_click(x, y) elif button == "right": @@ -43,28 +52,34 @@ class cuaComputerHandler(ComputerHandler): async def double_click(self, x: int, y: int) -> None: """Double click at coordinates.""" + assert self.interface is not None await self.interface.double_click(x, y) async def scroll(self, x: int, y: int, scroll_x: int, scroll_y: int) -> None: """Scroll at coordinates with specified scroll amounts.""" + assert self.interface is not None await self.interface.move_cursor(x, y) await self.interface.scroll(scroll_x, scroll_y) async def type(self, text: str) -> None: """Type text.""" + assert self.interface is not None await self.interface.type_text(text) async def wait(self, ms: int = 1000) -> None: """Wait for specified milliseconds.""" + assert self.interface is not None import asyncio await asyncio.sleep(ms / 1000.0) async def move(self, x: int, y: int) -> None: """Move cursor to coordinates.""" + assert self.interface is not None await self.interface.move_cursor(x, y) async def keypress(self, keys: Union[List[str], str]) -> None: """Press key combination.""" + assert self.interface is not None if isinstance(keys, str): keys = keys.replace("-", "+").split("+") if len(keys) == 1: @@ -75,6 +90,7 @@ class cuaComputerHandler(ComputerHandler): async def drag(self, path: List[Dict[str, int]]) -> None: """Drag along specified path.""" + assert self.interface is not None if not path: return @@ -99,23 +115,10 @@ class cuaComputerHandler(ComputerHandler): # ==== Anthropic Computer Action Space ==== async def left_mouse_down(self, x: Optional[int] = None, y: Optional[int] = None) -> None: """Left mouse down at coordinates.""" + assert self.interface is not None await self.interface.mouse_down(x, y, button="left") async def left_mouse_up(self, x: Optional[int] = None, y: Optional[int] = None) -> None: """Left mouse up at coordinates.""" - await self.interface.mouse_up(x, y, button="left") - -def acknowledge_safety_check_callback(message: str, allow_always: bool = False) -> bool: - """Safety check callback for user acknowledgment.""" - if allow_always: - return True - response = input( - f"Safety Check Warning: {message}\nDo you want to acknowledge and proceed? (y/n): " - ).lower() - return response.strip() == "y" - - -def check_blocklisted_url(url: str) -> None: - """Check if URL is blocklisted (placeholder implementation).""" - # This would contain actual URL checking logic - pass + assert self.interface is not None + await self.interface.mouse_up(x, y, button="left") \ No newline at end of file diff --git a/libs/python/agent/agent/computers/custom.py b/libs/python/agent/agent/computers/custom.py new file mode 100644 index 00000000..19079bad --- /dev/null +++ b/libs/python/agent/agent/computers/custom.py @@ -0,0 +1,185 @@ +""" +Custom computer handler implementation that accepts a dictionary of functions. +""" + +import base64 +from typing import Dict, List, Any, Literal, Union, Optional, Callable +from PIL import Image +import io +from .base import ComputerHandler + + +class CustomComputerHandler(ComputerHandler): + """Computer handler that implements the Computer protocol using a dictionary of custom functions.""" + + def __init__(self, functions: Dict[str, Callable]): + """ + Initialize with a dictionary of functions. + + Args: + functions: Dictionary where keys are method names and values are callable functions. + Only 'screenshot' is required, all others are optional. + + Raises: + ValueError: If required 'screenshot' function is not provided. + """ + if 'screenshot' not in functions: + raise ValueError("'screenshot' function is required in functions dictionary") + + self.functions = functions + self._last_screenshot_size: Optional[tuple[int, int]] = None + + async def _get_value(self, attribute: str): + """ + Get value for an attribute, checking both 'get_{attribute}' and '{attribute}' keys. + + Args: + attribute: The attribute name to look for + + Returns: + The value from the functions dict, called if callable, returned directly if not + """ + # Check for 'get_{attribute}' first + get_key = f"get_{attribute}" + if get_key in self.functions: + value = self.functions[get_key] + return await value() if callable(value) else value + + # Check for '{attribute}' + if attribute in self.functions: + value = self.functions[attribute] + return await value() if callable(value) else value + + return None + + def _to_b64_str(self, img: Union[bytes, Image.Image, str]) -> str: + """ + Convert image to base64 string. + + Args: + img: Image as bytes, PIL Image, or base64 string + + Returns: + str: Base64 encoded image string + """ + if isinstance(img, str): + # Already a base64 string + return img + elif isinstance(img, bytes): + # Raw bytes + return base64.b64encode(img).decode('utf-8') + elif isinstance(img, Image.Image): + # PIL Image + buffer = io.BytesIO() + img.save(buffer, format='PNG') + return base64.b64encode(buffer.getvalue()).decode('utf-8') + else: + raise ValueError(f"Unsupported image type: {type(img)}") + + # ==== Computer-Use-Preview Action Space ==== + + async def get_environment(self) -> Literal["windows", "mac", "linux", "browser"]: + """Get the current environment type.""" + result = await self._get_value('environment') + return result if result is not None else "linux" + + async def get_dimensions(self) -> tuple[int, int]: + """Get screen dimensions as (width, height).""" + result = await self._get_value('dimensions') + if result is not None: + return result + + # Fallback: use last screenshot size if available + if not self._last_screenshot_size: + await self.screenshot() + assert self._last_screenshot_size is not None, "Failed to get screenshot size" + + return self._last_screenshot_size + + async def screenshot(self) -> str: + """Take a screenshot and return as base64 string.""" + result = await self.functions['screenshot']() + b64_str = self._to_b64_str(result) + + # Try to extract dimensions for fallback use + try: + if isinstance(result, Image.Image): + self._last_screenshot_size = result.size + elif isinstance(result, bytes): + # Try to decode bytes to get dimensions + img = Image.open(io.BytesIO(result)) + self._last_screenshot_size = img.size + except Exception: + # If we can't get dimensions, that's okay + pass + + return b64_str + + async def click(self, x: int, y: int, button: str = "left") -> None: + """Click at coordinates with specified button.""" + if 'click' in self.functions: + await self.functions['click'](x, y, button) + # No-op if not implemented + + async def double_click(self, x: int, y: int) -> None: + """Double click at coordinates.""" + if 'double_click' in self.functions: + await self.functions['double_click'](x, y) + # No-op if not implemented + + async def scroll(self, x: int, y: int, scroll_x: int, scroll_y: int) -> None: + """Scroll at coordinates with specified scroll amounts.""" + if 'scroll' in self.functions: + await self.functions['scroll'](x, y, scroll_x, scroll_y) + # No-op if not implemented + + async def type(self, text: str) -> None: + """Type text.""" + if 'type' in self.functions: + await self.functions['type'](text) + # No-op if not implemented + + async def wait(self, ms: int = 1000) -> None: + """Wait for specified milliseconds.""" + if 'wait' in self.functions: + await self.functions['wait'](ms) + else: + # Default implementation + import asyncio + await asyncio.sleep(ms / 1000.0) + + async def move(self, x: int, y: int) -> None: + """Move cursor to coordinates.""" + if 'move' in self.functions: + await self.functions['move'](x, y) + # No-op if not implemented + + async def keypress(self, keys: Union[List[str], str]) -> None: + """Press key combination.""" + if 'keypress' in self.functions: + await self.functions['keypress'](keys) + # No-op if not implemented + + async def drag(self, path: List[Dict[str, int]]) -> None: + """Drag along specified path.""" + if 'drag' in self.functions: + await self.functions['drag'](path) + # No-op if not implemented + + async def get_current_url(self) -> str: + """Get current URL (for browser environments).""" + if 'get_current_url' in self.functions: + return await self.functions['get_current_url']() + return "" # Default fallback + + async def left_mouse_down(self, x: Optional[int] = None, y: Optional[int] = None) -> None: + """Left mouse down at coordinates.""" + if 'left_mouse_down' in self.functions: + await self.functions['left_mouse_down'](x, y) + # No-op if not implemented + + async def left_mouse_up(self, x: Optional[int] = None, y: Optional[int] = None) -> None: + """Left mouse up at coordinates.""" + if 'left_mouse_up' in self.functions: + await self.functions['left_mouse_up'](x, y) + # No-op if not implemented diff --git a/libs/python/agent/agent/types.py b/libs/python/agent/agent/types.py index c56a9e5c..23946c86 100644 --- a/libs/python/agent/agent/types.py +++ b/libs/python/agent/agent/types.py @@ -9,7 +9,7 @@ from litellm import ResponseInputParam, ResponsesAPIResponse, ToolParam from collections.abc import Iterable # Agent input types -Messages = str | ResponseInputParam +Messages = str | ResponseInputParam | List[Dict[str, Any]] Tools = Optional[Iterable[ToolParam]] # Agent output types