Clean unused refs

This commit is contained in:
f-trycua
2025-03-24 11:11:32 +01:00
parent e32b64590a
commit 167765a4d6
22 changed files with 327 additions and 1453 deletions

View File

@@ -1,18 +1,15 @@
"""Example demonstrating the ComputerAgent capabilities with the Omni provider."""
import os
import asyncio
import logging
import traceback
from pathlib import Path
import signal
import json
from computer import Computer
# Import the unified agent class and types
from agent import AgentLoop, LLMProvider, LLM
from agent.core.computer_agent import ComputerAgent
from agent import ComputerAgent, LLMProvider, LLM, AgentLoop
# Import utility functions
from utils import load_dotenv_files, handle_sigint
@@ -22,7 +19,7 @@ logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
async def run_omni_agent_example():
async def run_agent_example():
"""Run example of using the ComputerAgent with OpenAI and Omni provider."""
print("\n=== Example: ComputerAgent with OpenAI and Omni provider ===")
@@ -35,8 +32,8 @@ async def run_omni_agent_example():
computer=computer,
# loop=AgentLoop.ANTHROPIC,
loop=AgentLoop.OMNI,
# model=LLM(provider=LLMProvider.OPENAI, name="gpt-4.5-preview"),
model=LLM(provider=LLMProvider.ANTHROPIC, name="claude-3-7-sonnet-20250219"),
model=LLM(provider=LLMProvider.OPENAI, name="gpt-4.5-preview"),
# model=LLM(provider=LLMProvider.ANTHROPIC, name="claude-3-7-sonnet-20250219"),
save_trajectory=True,
only_n_most_recent_images=3,
verbosity=logging.DEBUG,
@@ -51,13 +48,12 @@ async def run_omni_agent_example():
"Focus on the Composer text area, then write and submit a task to help resolve the GitHub issue.",
]
async with agent:
for i, task in enumerate(tasks):
print(f"\nExecuting task {i}/{len(tasks)}: {task}")
async for result in agent.run(task):
print(result)
for i, task in enumerate(tasks):
print(f"\nExecuting task {i}/{len(tasks)}: {task}")
async for result in agent.run(task):
print(result)
print(f"\n✅ Task {i+1}/{len(tasks)} completed: {task}")
print(f"\n✅ Task {i+1}/{len(tasks)} completed: {task}")
except Exception as e:
logger.error(f"Error in run_omni_agent_example: {e}")
@@ -81,7 +77,7 @@ def main():
# Register signal handler for graceful exit
signal.signal(signal.SIGINT, handle_sigint)
asyncio.run(run_omni_agent_example())
asyncio.run(run_agent_example())
except Exception as e:
print(f"Error running example: {e}")
traceback.print_exc()

View File

@@ -49,6 +49,7 @@ except Exception as e:
logger.warning(f"Error initializing telemetry: {e}")
from .providers.omni.types import LLMProvider, LLM
from .types.base import AgentLoop
from .core.loop import AgentLoop
from .core.computer_agent import ComputerAgent
__all__ = ["AgentLoop", "LLMProvider", "LLM"]
__all__ = ["AgentLoop", "LLMProvider", "LLM", "ComputerAgent"]

View File

@@ -3,6 +3,7 @@
import logging
import asyncio
from abc import ABC, abstractmethod
from enum import Enum, auto
from typing import Any, AsyncGenerator, Dict, List, Optional, Tuple
from datetime import datetime
@@ -12,6 +13,14 @@ from .experiment import ExperimentManager
logger = logging.getLogger(__name__)
class AgentLoop(Enum):
"""Enumeration of available loop types."""
ANTHROPIC = auto() # Anthropic implementation
OMNI = auto() # OmniLoop implementation
# Add more loop types as needed
class BaseLoop(ABC):
"""Base class for agent loops that handle message processing and tool execution."""
@@ -191,156 +200,3 @@ class BaseLoop(ABC):
"""
if self.experiment_manager:
self.experiment_manager.save_screenshot(img_base64, action_type)
def _create_openai_compatible_response(
self, response: Any, messages: List[Dict[str, Any]]
) -> Dict[str, Any]:
"""Create an OpenAI computer use agent compatible response format.
Args:
response: The original API response
messages: List of messages in standard OpenAI format
Returns:
A response formatted according to OpenAI's computer use agent standard
"""
import json
# Create a unique ID for this response
response_id = f"resp_{datetime.now().strftime('%Y%m%d%H%M%S')}_{id(response)}"
reasoning_id = f"rs_{response_id}"
action_id = f"cu_{response_id}"
call_id = f"call_{response_id}"
# Extract the last assistant message
assistant_msg = None
for msg in reversed(messages):
if msg["role"] == "assistant":
assistant_msg = msg
break
if not assistant_msg:
# If no assistant message found, create a default one
assistant_msg = {"role": "assistant", "content": "No response available"}
# Initialize output array
output_items = []
# Extract reasoning and action details from the response
content = assistant_msg["content"]
reasoning_text = None
action_details = None
# Extract reasoning and action from different content formats
if isinstance(content, str):
try:
# Try to parse JSON
parsed_content = json.loads(content)
reasoning_text = parsed_content.get("Explanation", "")
# Extract action details
action = parsed_content.get("Action", "")
position = parsed_content.get("Position", {})
text_input = parsed_content.get("Text", "")
if action.lower() == "click" and position:
action_details = {
"type": "click",
"button": "left",
"x": position.get("x", 100),
"y": position.get("y", 100),
}
elif action.lower() == "type" and text_input:
action_details = {
"type": "type",
"text": text_input,
}
elif action.lower() == "scroll":
action_details = {
"type": "scroll",
"x": 100,
"y": 100,
"scroll_x": position.get("delta_x", 0),
"scroll_y": position.get("delta_y", 0),
}
except json.JSONDecodeError:
# If not valid JSON, use the content as reasoning
reasoning_text = content
elif isinstance(content, list):
# Handle list of content blocks (like Anthropic format)
for item in content:
if isinstance(item, dict):
if item.get("type") == "text":
# Collect text blocks for reasoning
if reasoning_text is None:
reasoning_text = ""
reasoning_text += item.get("text", "")
elif item.get("type") == "tool_use":
# Extract action from tool_use (similar to Anthropic format)
tool_input = item.get("input", {})
if "click" in tool_input or "position" in tool_input:
position = tool_input.get("click", tool_input.get("position", {}))
if isinstance(position, dict) and "x" in position and "y" in position:
action_details = {
"type": "click",
"button": "left",
"x": position.get("x", 100),
"y": position.get("y", 100),
}
elif "type" in tool_input or "text" in tool_input:
action_details = {
"type": "type",
"text": tool_input.get("type", tool_input.get("text", "")),
}
elif "scroll" in tool_input:
scroll = tool_input.get("scroll", {})
action_details = {
"type": "scroll",
"x": 100,
"y": 100,
"scroll_x": scroll.get("x", 0),
"scroll_y": scroll.get("y", 0),
}
# Add reasoning item if we have text content
if reasoning_text:
output_items.append(
{
"type": "reasoning",
"id": reasoning_id,
"summary": [
{
"type": "summary_text",
"text": reasoning_text[:200], # Truncate to reasonable length
}
],
}
)
# If no action details extracted, use default
if not action_details:
action_details = {
"type": "click",
"button": "left",
"x": 100,
"y": 100,
}
# Add computer_call item
computer_call = {
"type": "computer_call",
"id": action_id,
"call_id": call_id,
"action": action_details,
"pending_safety_checks": [],
"status": "completed",
}
output_items.append(computer_call)
# Create the OpenAI-compatible response format
return {
"output": output_items,
"id": response_id,
# Include the original response for compatibility
"response": {"choices": [{"message": assistant_msg, "finish_reason": "stop"}]},
}

View File

@@ -1,13 +1,11 @@
"""Message handling utilities for agent."""
import base64
from datetime import datetime
from io import BytesIO
import logging
from typing import Any, Dict, List, Optional, Union, cast, Tuple
from PIL import Image
import json
from typing import Any, Dict, List, Optional, Union, Tuple
from dataclasses import dataclass
import re
from ..providers.omni.parser import ParseResult
logger = logging.getLogger(__name__)
@@ -399,3 +397,219 @@ class StandardMessageManager:
result.append(openai_msg)
return result
async def create_openai_compatible_response(
self,
response: Any,
messages: List[Dict[str, Any]],
parsed_screen: Optional[ParseResult] = None,
parser: Optional[Any] = None,
model: Optional[str] = None,
) -> Dict[str, Any]:
"""Create an OpenAI computer use agent compatible response format.
Args:
response: The original API response
messages: List of messages in standard OpenAI format
parsed_screen: Optional pre-parsed screen information
parser: Optional parser instance for coordinate calculation
model: Optional model name
Returns:
A response formatted according to OpenAI's computer use agent standard
"""
from datetime import datetime
import time
# Create a unique ID for this response
response_id = f"resp_{datetime.now().strftime('%Y%m%d%H%M%S')}_{id(response)}"
reasoning_id = f"rs_{response_id}"
action_id = f"cu_{response_id}"
call_id = f"call_{response_id}"
# Extract the last assistant message
assistant_msg = None
for msg in reversed(messages):
if msg["role"] == "assistant":
assistant_msg = msg
break
if not assistant_msg:
# If no assistant message found, create a default one
assistant_msg = {"role": "assistant", "content": "No response available"}
# Initialize output array
output_items = []
# Extract reasoning and action details from the response
content = assistant_msg["content"]
reasoning_text = None
action_details = None
for item in content:
if isinstance(item, dict) and item.get("type") == "text":
try:
# Try to parse JSON from text block
text_content = item.get("text", "")
parsed_json = json.loads(text_content)
# Get reasoning text
if reasoning_text is None:
reasoning_text = parsed_json.get("Explanation", "")
# Extract action details
action = parsed_json.get("Action", "").lower()
text_input = parsed_json.get("Text", "")
value = parsed_json.get("Value", "") # Also handle Value field
box_id = parsed_json.get("Box ID") # Extract Box ID
if action in ["click", "left_click"]:
# Always calculate coordinates from Box ID for click actions
x, y = 100, 100 # Default fallback values
if parsed_screen and box_id is not None and parser is not None:
try:
box_id_int = (
box_id
if isinstance(box_id, int)
else int(str(box_id)) if str(box_id).isdigit() else None
)
if box_id_int is not None:
# Use the parser's method to calculate coordinates
x, y = await parser.calculate_click_coordinates(
box_id_int, parsed_screen
)
logger.info(
f"Extracted coordinates for Box ID {box_id_int}: ({x}, {y})"
)
except Exception as e:
logger.error(
f"Error extracting coordinates for Box ID {box_id}: {str(e)}"
)
action_details = {
"type": "click",
"button": "left",
"box_id": (
(
box_id
if isinstance(box_id, int)
else int(box_id) if str(box_id).isdigit() else None
)
if box_id is not None
else None
),
"x": x,
"y": y,
}
elif action in ["type", "type_text"] and (text_input or value):
action_details = {
"type": "type",
"text": text_input or value,
}
elif action == "hotkey" and value:
action_details = {
"type": "hotkey",
"keys": value,
}
elif action == "scroll":
# Use default coordinates for scrolling
delta_x = 0
delta_y = 0
# Try to extract scroll delta values from content if available
scroll_data = parsed_json.get("Scroll", {})
if scroll_data:
delta_x = scroll_data.get("delta_x", 0)
delta_y = scroll_data.get("delta_y", 0)
action_details = {
"type": "scroll",
"x": 100,
"y": 100,
"scroll_x": delta_x,
"scroll_y": delta_y,
}
elif action == "none":
# Handle case when action is None (task completion)
action_details = {"type": "none", "description": "Task completed"}
except json.JSONDecodeError:
# If not JSON, just use as reasoning text
if reasoning_text is None:
reasoning_text = ""
reasoning_text += item.get("text", "")
# Add reasoning item if we have text content
if reasoning_text:
output_items.append(
{
"type": "reasoning",
"id": reasoning_id,
"summary": [
{
"type": "summary_text",
"text": reasoning_text[:200], # Truncate to reasonable length
}
],
}
)
# If no action details extracted, use default
if not action_details:
action_details = {
"type": "click",
"button": "left",
"x": 100,
"y": 100,
}
# Add computer_call item
computer_call = {
"type": "computer_call",
"id": action_id,
"call_id": call_id,
"action": action_details,
"pending_safety_checks": [],
"status": "completed",
}
output_items.append(computer_call)
# Create the OpenAI-compatible response format with all expected fields
return {
"id": response_id,
"object": "response",
"created_at": int(time.time()),
"status": "completed",
"error": None,
"incomplete_details": None,
"instructions": None,
"max_output_tokens": None,
"model": model or "unknown",
"output": output_items,
"parallel_tool_calls": True,
"previous_response_id": None,
"reasoning": {"effort": "medium", "generate_summary": "concise"},
"store": True,
"temperature": 1.0,
"text": {"format": {"type": "text"}},
"tool_choice": "auto",
"tools": [
{
"type": "computer_use_preview",
"display_height": 768,
"display_width": 1024,
"environment": "mac",
}
],
"top_p": 1.0,
"truncation": "auto",
"usage": {
"input_tokens": 0, # Placeholder values
"input_tokens_details": {"cached_tokens": 0},
"output_tokens": 0, # Placeholder values
"output_tokens_details": {"reasoning_tokens": 0},
"total_tokens": 0, # Placeholder values
},
"user": None,
"metadata": {},
# Include the original response for backward compatibility
"response": {"choices": [{"message": assistant_msg, "finish_reason": "stop"}]},
}

View File

@@ -2,8 +2,7 @@
import logging
import asyncio
from typing import Any, Dict, List, Optional
from httpx import ConnectError, ReadTimeout
from typing import List
from anthropic.types.beta import (
BetaMessage,

View File

@@ -275,8 +275,14 @@ class AnthropicLoop(BaseLoop):
for msg in reversed(self.message_history):
if msg["role"] == "assistant":
# Create OpenAI-compatible response and add to queue
openai_compatible_response = self._create_openai_compatible_response(
msg, response
openai_compatible_response = (
await self.message_manager.create_openai_compatible_response(
response=response,
messages=self.message_history,
parsed_screen=None,
parser=None,
model=self.model,
)
)
await queue.put(openai_compatible_response)
break
@@ -295,108 +301,6 @@ class AnthropicLoop(BaseLoop):
)
await queue.put(None)
def _create_openai_compatible_response(
self, assistant_msg: Dict[str, Any], original_response: Any
) -> Dict[str, Any]:
"""Create an OpenAI computer use agent compatible response format.
Args:
assistant_msg: The assistant message in standard OpenAI format
original_response: The original API response object for ID generation
Returns:
A response formatted according to OpenAI's computer use agent standard
"""
# Create a unique ID for this response
response_id = f"resp_{datetime.now().strftime('%Y%m%d%H%M%S')}_{id(original_response)}"
reasoning_id = f"rs_{response_id}"
action_id = f"cu_{response_id}"
call_id = f"call_{response_id}"
# Extract reasoning and action details from the response
content = assistant_msg["content"]
# Initialize output array
output_items = []
# Add reasoning item if we have text content
reasoning_text = None
action_details = None
# AnthropicLoop expects a list of content blocks with type "text" or "tool_use"
if isinstance(content, list):
for item in content:
if isinstance(item, dict) and item.get("type") == "text":
reasoning_text = item.get("text", "")
elif isinstance(item, dict) and item.get("type") == "tool_use":
action_details = item
else:
# Fallback for string content
reasoning_text = content if isinstance(content, str) else None
# If we have reasoning text, add reasoning item
if reasoning_text:
output_items.append(
{
"type": "reasoning",
"id": reasoning_id,
"summary": [
{
"type": "summary_text",
"text": reasoning_text[:200], # Truncate to reasonable length
}
],
}
)
# Add computer_call item with action details if available
computer_call = {
"type": "computer_call",
"id": action_id,
"call_id": call_id,
"action": {"type": "click", "button": "left", "x": 100, "y": 100}, # Default action
"pending_safety_checks": [],
"status": "completed",
}
# If we have action details from a tool_use, update the computer_call
if action_details:
# Try to map tool_use to computer_call action
tool_input = action_details.get("input", {})
if "click" in tool_input or "position" in tool_input:
position = tool_input.get("click", tool_input.get("position", {}))
if isinstance(position, dict) and "x" in position and "y" in position:
computer_call["action"] = {
"type": "click",
"button": "left",
"x": position.get("x", 100),
"y": position.get("y", 100),
}
elif "type" in tool_input or "text" in tool_input:
computer_call["action"] = {
"type": "type",
"text": tool_input.get("type", tool_input.get("text", "")),
}
elif "scroll" in tool_input:
scroll = tool_input.get("scroll", {})
computer_call["action"] = {
"type": "scroll",
"x": 100,
"y": 100,
"scroll_x": scroll.get("x", 0),
"scroll_y": scroll.get("y", 0),
}
output_items.append(computer_call)
# Create the OpenAI-compatible response format
return {
"output": output_items,
"id": response_id,
# Include the original format for backward compatibility
"response": {"choices": [{"message": assistant_msg, "finish_reason": "stop"}]},
}
###########################################
# RESPONSE AND CALLBACK HANDLING
###########################################

View File

@@ -1,21 +1,8 @@
"""Omni provider implementation."""
# The OmniComputerAgent has been replaced by the unified ComputerAgent
# which can be found in agent.core.agent
from .types import LLMProvider
from .image_utils import (
decode_base64_image,
encode_image_base64,
clean_base64_data,
extract_base64_from_text,
get_image_dimensions,
)
__all__ = [
"LLMProvider",
"decode_base64_image",
"encode_image_base64",
"clean_base64_data",
"extract_base64_from_text",
"get_image_dimensions",
]
__all__ = ["LLMProvider", "decode_base64_image"]

View File

@@ -5,7 +5,6 @@ from typing import Dict, Any, Tuple
import json
from .parser import ParseResult
from ...core.visualization import calculate_element_center
logger = logging.getLogger(__name__)
@@ -48,8 +47,10 @@ class ActionExecutor:
box_id = int(content["Box ID"])
logger.info(f"Processing Box ID: {box_id}")
# Calculate click coordinates
x, y = await self.calculate_click_coordinates(box_id, parsed_screen)
# Calculate click coordinates using parser
x, y = await self.loop.parser.calculate_click_coordinates(
box_id, parsed_screen
)
logger.info(f"Calculated coordinates: x={x}, y={y}")
kwargs["x"] = x
@@ -72,7 +73,9 @@ class ActionExecutor:
elif action == "drag_to":
try:
box_id = int(content["Box ID"])
x, y = await self.calculate_click_coordinates(box_id, parsed_screen)
x, y = await self.loop.parser.calculate_click_coordinates(
box_id, parsed_screen
)
kwargs.update(
{
"x": x,
@@ -207,58 +210,3 @@ class ActionExecutor:
except Exception as e:
logger.error(f"Error in execute_action: {str(e)}")
return False
async def calculate_click_coordinates(
self, box_id: int, parsed_screen: ParseResult
) -> Tuple[int, int]:
"""Calculate click coordinates based on box ID.
Args:
box_id: The ID of the box to click
parsed_screen: The parsed screen information
Returns:
Tuple of (x, y) coordinates
Raises:
ValueError: If box_id is invalid or missing from parsed screen
"""
# First try to use structured elements data
logger.info(f"Elements count: {len(parsed_screen.elements)}")
# Try to find element with matching ID
for element in parsed_screen.elements:
if element.id == box_id:
logger.info(f"Found element with ID {box_id}: {element}")
bbox = element.bbox
# Get screen dimensions from the metadata if available, or fallback
width = parsed_screen.metadata.width if parsed_screen.metadata else 1920
height = parsed_screen.metadata.height if parsed_screen.metadata else 1080
logger.info(f"Screen dimensions: width={width}, height={height}")
# Create a dictionary from the element's bbox for calculate_element_center
bbox_dict = {"x1": bbox.x1, "y1": bbox.y1, "x2": bbox.x2, "y2": bbox.y2}
center_x, center_y = calculate_element_center(bbox_dict, width, height)
logger.info(f"Calculated center: ({center_x}, {center_y})")
# Validate coordinates - if they're (0,0) or unreasonably small,
# use a default position in the center of the screen
if center_x == 0 and center_y == 0:
logger.warning("Got (0,0) coordinates, using fallback position")
center_x = width // 2
center_y = height // 2
logger.info(f"Using fallback center: ({center_x}, {center_y})")
return center_x, center_y
# If we couldn't find the box, use center of screen
logger.error(
f"Box ID {box_id} not found in structured elements (count={len(parsed_screen.elements)})"
)
# Use center of screen as fallback
width = parsed_screen.metadata.width if parsed_screen.metadata else 1920
height = parsed_screen.metadata.height if parsed_screen.metadata else 1080
logger.warning(f"Using fallback position in center of screen ({width//2}, {height//2})")
return width // 2, height // 2

View File

@@ -1,78 +0,0 @@
"""Omni callback manager implementation."""
import logging
from typing import Any, Dict, Optional, Set
from ...core.callbacks import BaseCallbackManager, ContentCallback, ToolCallback, APICallback
from ...types.tools import ToolResult
logger = logging.getLogger(__name__)
class OmniCallbackManager(BaseCallbackManager):
"""Callback manager for multi-provider support."""
def __init__(
self,
content_callback: ContentCallback,
tool_callback: ToolCallback,
api_callback: APICallback,
):
"""Initialize Omni callback manager.
Args:
content_callback: Callback for content updates
tool_callback: Callback for tool execution results
api_callback: Callback for API interactions
"""
super().__init__(
content_callback=content_callback,
tool_callback=tool_callback,
api_callback=api_callback
)
self._active_tools: Set[str] = set()
def on_content(self, content: Any) -> None:
"""Handle content updates.
Args:
content: Content update data
"""
logger.debug(f"Content update: {content}")
self.content_callback(content)
def on_tool_result(self, result: ToolResult, tool_id: str) -> None:
"""Handle tool execution results.
Args:
result: Tool execution result
tool_id: ID of the tool
"""
logger.debug(f"Tool result for {tool_id}: {result}")
self.tool_callback(result, tool_id)
def on_api_interaction(
self,
request: Any,
response: Any,
error: Optional[Exception] = None
) -> None:
"""Handle API interactions.
Args:
request: API request data
response: API response data
error: Optional error that occurred
"""
if error:
logger.error(f"API error: {str(error)}")
else:
logger.debug(f"API interaction - Request: {request}, Response: {response}")
self.api_callback(request, response, error)
def get_active_tools(self) -> Set[str]:
"""Get currently active tools.
Returns:
Set of active tool names
"""
return self._active_tools.copy()

View File

@@ -32,75 +32,3 @@ def decode_base64_image(img_base64: str) -> Optional[Image.Image]:
except Exception as e:
logger.error(f"Error decoding base64 image: {str(e)}")
return None
def encode_image_base64(img: Image.Image, format: str = "PNG") -> str:
"""Encode a PIL Image to base64.
Args:
img: PIL Image to encode
format: Image format (PNG, JPEG, etc.)
Returns:
Base64 encoded image string
"""
try:
buffered = BytesIO()
img.save(buffered, format=format)
return base64.b64encode(buffered.getvalue()).decode("utf-8")
except Exception as e:
logger.error(f"Error encoding image to base64: {str(e)}")
return ""
def clean_base64_data(img_base64: str) -> str:
"""Clean base64 image data by removing data URL prefix.
Args:
img_base64: Base64 encoded image, may include data URL prefix
Returns:
Clean base64 string without prefix
"""
if img_base64.startswith("data:image"):
return img_base64.split(",")[1]
return img_base64
def extract_base64_from_text(text: str) -> Optional[str]:
"""Extract base64 image data from a text string.
Args:
text: Text potentially containing base64 image data
Returns:
Base64 string or None if not found
"""
# Look for data URL pattern
data_url_pattern = r"data:image/[^;]+;base64,([a-zA-Z0-9+/=]+)"
match = re.search(data_url_pattern, text)
if match:
return match.group(1)
# Look for plain base64 pattern (basic heuristic)
base64_pattern = r"([a-zA-Z0-9+/=]{100,})"
match = re.search(base64_pattern, text)
if match:
return match.group(1)
return None
def get_image_dimensions(img_base64: str) -> Tuple[int, int]:
"""Get the dimensions of a base64 encoded image.
Args:
img_base64: Base64 encoded image
Returns:
Tuple of (width, height) or (0, 0) if decoding fails
"""
img = decode_base64_image(img_base64)
if img:
return img.size
return (0, 0)

View File

@@ -1,17 +1,15 @@
"""Omni-specific agent loop implementation."""
import logging
from typing import Any, Dict, List, Optional, Tuple, AsyncGenerator, Union
from PIL import Image
from typing import Any, Dict, List, Optional, Tuple, AsyncGenerator
import json
import re
import os
from datetime import datetime
import asyncio
from httpx import ConnectError, ReadTimeout
from typing import cast
from .parser import OmniParser, ParseResult, ParserMetadata, UIElement
from .parser import OmniParser, ParseResult
from ...core.loop import BaseLoop
from ...core.visualization import VisualizationHelper
from ...core.messages import StandardMessageManager, ImageRetentionConfig
@@ -20,8 +18,6 @@ from .types import LLMProvider
from .clients.openai import OpenAIClient
from .clients.anthropic import AnthropicClient
from .prompts import SYSTEM_PROMPT
from .utils import compress_image_base64
from .image_utils import decode_base64_image, clean_base64_data
from .api_handler import OmniAPIHandler
from .action_executor import ActionExecutor
@@ -644,9 +640,15 @@ class OmniLoop(BaseLoop):
# Update whether an action screenshot was saved this turn
action_screenshot_saved = action_screenshot_saved or new_screenshot_saved
# Create OpenAI-compatible response format
openai_compatible_response = await self._create_openai_compatible_response(
response, self.message_manager.messages, parsed_screen
# Create OpenAI-compatible response format - directly use StandardMessageManager method
openai_compatible_response = (
await self.message_manager.create_openai_compatible_response(
response=response,
messages=self.message_manager.messages,
parsed_screen=parsed_screen,
parser=self.parser,
model=self.model,
)
)
# Yield the response to the caller
@@ -678,215 +680,3 @@ class OmniLoop(BaseLoop):
# Create a brief delay before retrying
await asyncio.sleep(1)
async def _create_openai_compatible_response(
self,
response: Any,
messages: List[Dict[str, Any]],
parsed_screen: Optional[ParseResult] = None,
) -> Dict[str, Any]:
"""Create an OpenAI computer use agent compatible response format.
Args:
response: The original API response
messages: List of messages in standard OpenAI format
parsed_screen: Optional pre-parsed screen information
Returns:
A response formatted according to OpenAI's computer use agent standard
"""
from datetime import datetime
import time
# Create a unique ID for this response
response_id = f"resp_{datetime.now().strftime('%Y%m%d%H%M%S')}_{id(response)}"
reasoning_id = f"rs_{response_id}"
action_id = f"cu_{response_id}"
call_id = f"call_{response_id}"
# Extract the last assistant message
assistant_msg = None
for msg in reversed(messages):
if msg["role"] == "assistant":
assistant_msg = msg
break
if not assistant_msg:
# If no assistant message found, create a default one
assistant_msg = {"role": "assistant", "content": "No response available"}
# Initialize output array
output_items = []
# Extract reasoning and action details from the response
content = assistant_msg["content"]
reasoning_text = None
action_details = None
for item in content:
if isinstance(item, dict) and item.get("type") == "text":
try:
# Try to parse JSON from text block
text_content = item.get("text", "")
parsed_json = json.loads(text_content)
# Get reasoning text
if reasoning_text is None:
reasoning_text = parsed_json.get("Explanation", "")
# Extract action details
action = parsed_json.get("Action", "").lower()
text_input = parsed_json.get("Text", "")
value = parsed_json.get("Value", "") # Also handle Value field
box_id = parsed_json.get("Box ID") # Extract Box ID
if action in ["click", "left_click"]:
# Always calculate coordinates from Box ID for click actions
x, y = 100, 100 # Default fallback values
if parsed_screen and box_id is not None:
try:
box_id_int = (
box_id
if isinstance(box_id, int)
else int(str(box_id)) if str(box_id).isdigit() else None
)
if box_id_int is not None:
# Use the ActionExecutor's method to calculate coordinates with await
x, y = await self.action_executor.calculate_click_coordinates(
box_id_int, parsed_screen
)
logger.info(
f"Extracted coordinates for Box ID {box_id_int}: ({x}, {y})"
)
except Exception as e:
logger.error(
f"Error extracting coordinates for Box ID {box_id}: {str(e)}"
)
action_details = {
"type": "click",
"button": "left",
"box_id": (
(
box_id
if isinstance(box_id, int)
else int(box_id) if str(box_id).isdigit() else None
)
if box_id is not None
else None
),
"x": x,
"y": y,
}
elif action in ["type", "type_text"] and (text_input or value):
action_details = {
"type": "type",
"text": text_input or value,
}
elif action == "hotkey" and value:
action_details = {
"type": "hotkey",
"keys": value,
}
elif action == "scroll":
# Use default coordinates for scrolling
delta_x = 0
delta_y = 0
# Try to extract scroll delta values from content if available
scroll_data = parsed_json.get("Scroll", {})
if scroll_data:
delta_x = scroll_data.get("delta_x", 0)
delta_y = scroll_data.get("delta_y", 0)
action_details = {
"type": "scroll",
"x": 100,
"y": 100,
"scroll_x": delta_x,
"scroll_y": delta_y,
}
elif action == "none":
# Handle case when action is None (task completion)
action_details = {"type": "none", "description": "Task completed"}
except json.JSONDecodeError:
# If not JSON, just use as reasoning text
if reasoning_text is None:
reasoning_text = ""
reasoning_text += item.get("text", "")
# Add reasoning item if we have text content
if reasoning_text:
output_items.append(
{
"type": "reasoning",
"id": reasoning_id,
"summary": [
{
"type": "summary_text",
"text": reasoning_text[:200], # Truncate to reasonable length
}
],
}
)
# If no action details extracted, use default
if not action_details:
action_details = {
"type": "click",
"button": "left",
"x": 100,
"y": 100,
}
# Add computer_call item
computer_call = {
"type": "computer_call",
"id": action_id,
"call_id": call_id,
"action": action_details,
"pending_safety_checks": [],
"status": "completed",
}
output_items.append(computer_call)
# Create the OpenAI-compatible response format with all expected fields
return {
"id": response_id,
"object": "response",
"created_at": int(time.time()),
"status": "completed",
"error": None,
"incomplete_details": None,
"instructions": None,
"max_output_tokens": None,
"model": self.model,
"output": output_items,
"parallel_tool_calls": True,
"previous_response_id": None,
"reasoning": {"effort": "medium", "generate_summary": "concise"},
"store": True,
"temperature": 1.0,
"text": {"format": {"type": "text"}},
"tool_choice": "auto",
"tools": [
{
"type": "computer_use_preview",
"display_height": 768,
"display_width": 1024,
"environment": "mac",
}
],
"top_p": 1.0,
"truncation": "auto",
"usage": {
"input_tokens": 0, # Placeholder values
"input_tokens_details": {"cached_tokens": 0},
"output_tokens": 0, # Placeholder values
"output_tokens_details": {"reasoning_tokens": 0},
"total_tokens": 0, # Placeholder values
},
"user": None,
"metadata": {},
# Include the original response for backward compatibility
"response": {"choices": [{"message": assistant_msg, "finish_reason": "stop"}]},
}

View File

@@ -3,14 +3,11 @@
import logging
from typing import Any, Dict, List, Optional, Tuple
import base64
from PIL import Image
from io import BytesIO
import json
import torch
# Import from the SOM package
from som import OmniParser as OmniDetectParser
from som.models import ParseResult, BoundingBox, UIElement, ImageData, ParserMetadata
from som.models import ParseResult, ParserMetadata
logger = logging.getLogger(__name__)
@@ -251,3 +248,60 @@ class OmniParser:
except Exception as e:
logger.error(f"Error formatting messages: {str(e)}")
return messages # Return original messages on error
async def calculate_click_coordinates(
self, box_id: int, parsed_screen: ParseResult
) -> Tuple[int, int]:
"""Calculate click coordinates based on box ID.
Args:
box_id: The ID of the box to click
parsed_screen: The parsed screen information
Returns:
Tuple of (x, y) coordinates
Raises:
ValueError: If box_id is invalid or missing from parsed screen
"""
# First try to use structured elements data
logger.info(f"Elements count: {len(parsed_screen.elements)}")
# Try to find element with matching ID
for element in parsed_screen.elements:
if element.id == box_id:
logger.info(f"Found element with ID {box_id}: {element}")
bbox = element.bbox
# Get screen dimensions from the metadata if available, or fallback
width = parsed_screen.metadata.width if parsed_screen.metadata else 1920
height = parsed_screen.metadata.height if parsed_screen.metadata else 1080
logger.info(f"Screen dimensions: width={width}, height={height}")
# Create a dictionary from the element's bbox for calculate_element_center
bbox_dict = {"x1": bbox.x1, "y1": bbox.y1, "x2": bbox.x2, "y2": bbox.y2}
from ...core.visualization import calculate_element_center
center_x, center_y = calculate_element_center(bbox_dict, width, height)
logger.info(f"Calculated center: ({center_x}, {center_y})")
# Validate coordinates - if they're (0,0) or unreasonably small,
# use a default position in the center of the screen
if center_x == 0 and center_y == 0:
logger.warning("Got (0,0) coordinates, using fallback position")
center_x = width // 2
center_y = height // 2
logger.info(f"Using fallback center: ({center_x}, {center_y})")
return center_x, center_y
# If we couldn't find the box, use center of screen
logger.error(
f"Box ID {box_id} not found in structured elements (count={len(parsed_screen.elements)})"
)
# Use center of screen as fallback
width = parsed_screen.metadata.width if parsed_screen.metadata else 1920
height = parsed_screen.metadata.height if parsed_screen.metadata else 1080
logger.warning(f"Using fallback position in center of screen ({width//2}, {height//2})")
return width // 2, height // 2

View File

@@ -1,91 +0,0 @@
# """Omni tool manager implementation."""
# from typing import Dict, List, Type, Any
# from computer import Computer
# from ...core.tools import BaseToolManager, BashTool, EditTool
# class OmniToolManager(BaseToolManager):
# """Tool manager for multi-provider support."""
# def __init__(self, computer: Computer):
# """Initialize Omni tool manager.
# Args:
# computer: Computer instance for tools
# """
# super().__init__(computer)
# def get_anthropic_tools(self) -> List[Dict[str, Any]]:
# """Get tools formatted for Anthropic API.
# Returns:
# List of tool parameters in Anthropic format
# """
# tools: List[Dict[str, Any]] = []
# # Map base tools to Anthropic format
# for tool in self.tools.values():
# if isinstance(tool, BashTool):
# tools.append({
# "type": "bash_20241022",
# "name": tool.name
# })
# elif isinstance(tool, EditTool):
# tools.append({
# "type": "text_editor_20241022",
# "name": "str_replace_editor"
# })
# return tools
# def get_openai_tools(self) -> List[Dict]:
# """Get tools formatted for OpenAI API.
# Returns:
# List of tool parameters in OpenAI format
# """
# tools = []
# # Map base tools to OpenAI format
# for tool in self.tools.values():
# tools.append({
# "type": "function",
# "function": tool.get_schema()
# })
# return tools
# def get_groq_tools(self) -> List[Dict]:
# """Get tools formatted for Groq API.
# Returns:
# List of tool parameters in Groq format
# """
# tools = []
# # Map base tools to Groq format
# for tool in self.tools.values():
# tools.append({
# "type": "function",
# "function": tool.get_schema()
# })
# return tools
# def get_qwen_tools(self) -> List[Dict]:
# """Get tools formatted for Qwen API.
# Returns:
# List of tool parameters in Qwen format
# """
# tools = []
# # Map base tools to Qwen format
# for tool in self.tools.values():
# tools.append({
# "type": "function",
# "function": tool.get_schema()
# })
# return tools

View File

@@ -1,11 +1 @@
"""Omni provider tools - compatible with multiple LLM providers."""
from .bash import OmniBashTool
from .computer import OmniComputerTool
from .manager import OmniToolManager
__all__ = [
"OmniBashTool",
"OmniComputerTool",
"OmniToolManager",
]

View File

@@ -1,69 +0,0 @@
"""Provider-agnostic implementation of the BashTool."""
import logging
from typing import Any, Dict
from computer.computer import Computer
from ....core.tools.bash import BaseBashTool
from ....core.tools import ToolResult
class OmniBashTool(BaseBashTool):
"""A provider-agnostic implementation of the bash tool."""
name = "bash"
logger = logging.getLogger(__name__)
def __init__(self, computer: Computer):
"""Initialize the BashTool.
Args:
computer: Computer instance, may be used for related operations
"""
super().__init__(computer)
def to_params(self) -> Dict[str, Any]:
"""Convert tool to provider-agnostic parameters.
Returns:
Dictionary with tool parameters
"""
return {
"name": self.name,
"description": "A tool that allows the agent to run bash commands",
"parameters": {
"command": {"type": "string", "description": "The bash command to execute"},
"restart": {
"type": "boolean",
"description": "Whether to restart the bash session",
},
},
}
async def __call__(self, **kwargs) -> ToolResult:
"""Execute the bash tool with the provided arguments.
Args:
command: The bash command to execute
restart: Whether to restart the bash session
Returns:
ToolResult with the command output
"""
command = kwargs.get("command")
restart = kwargs.get("restart", False)
if not command:
return ToolResult(error="Command is required")
self.logger.info(f"Executing bash command: {command}")
exit_code, stdout, stderr = await self.run_command(command)
output = stdout
error = None
if exit_code != 0:
error = f"Command exited with code {exit_code}: {stderr}"
return ToolResult(output=output, error=error)

View File

@@ -1,217 +0,0 @@
"""Provider-agnostic implementation of the ComputerTool."""
import logging
import base64
import io
from typing import Any, Dict
from PIL import Image
from computer.computer import Computer
from ....core.tools.computer import BaseComputerTool
from ....core.tools import ToolResult, ToolError
class OmniComputerTool(BaseComputerTool):
"""A provider-agnostic implementation of the computer tool."""
name = "computer"
logger = logging.getLogger(__name__)
def __init__(self, computer: Computer):
"""Initialize the ComputerTool.
Args:
computer: Computer instance for screen interactions
"""
super().__init__(computer)
# Initialize dimensions to None, will be set in initialize_dimensions
self.width = None
self.height = None
self.display_num = None
def to_params(self) -> Dict[str, Any]:
"""Convert tool to provider-agnostic parameters.
Returns:
Dictionary with tool parameters
"""
return {
"name": self.name,
"description": "A tool that allows the agent to interact with the screen, keyboard, and mouse",
"parameters": {
"action": {
"type": "string",
"enum": [
"key",
"type",
"mouse_move",
"left_click",
"left_click_drag",
"right_click",
"middle_click",
"double_click",
"screenshot",
"cursor_position",
"scroll",
],
"description": "The action to perform on the computer",
},
"text": {
"type": "string",
"description": "Text to type or key to press, required for 'key' and 'type' actions",
},
"coordinate": {
"type": "array",
"items": {"type": "integer"},
"description": "X,Y coordinates for mouse actions like click and move",
},
"direction": {
"type": "string",
"enum": ["up", "down"],
"description": "Direction to scroll, used with the 'scroll' action",
},
"amount": {
"type": "integer",
"description": "Amount to scroll, used with the 'scroll' action",
},
},
**self.options,
}
async def __call__(self, **kwargs) -> ToolResult:
"""Execute the computer tool with the provided arguments.
Args:
action: The action to perform
text: Text to type or key to press (for key/type actions)
coordinate: X,Y coordinates (for mouse actions)
direction: Direction to scroll (for scroll action)
amount: Amount to scroll (for scroll action)
Returns:
ToolResult with the action output and optional screenshot
"""
# Ensure dimensions are initialized
if self.width is None or self.height is None:
await self.initialize_dimensions()
action = kwargs.get("action")
text = kwargs.get("text")
coordinate = kwargs.get("coordinate")
direction = kwargs.get("direction", "down")
amount = kwargs.get("amount", 10)
self.logger.info(f"Executing computer action: {action}")
try:
if action == "screenshot":
return await self.screenshot()
elif action == "left_click" and coordinate:
x, y = coordinate
self.logger.info(f"Clicking at ({x}, {y})")
await self.computer.interface.move_cursor(x, y)
await self.computer.interface.left_click()
# Take screenshot after action
screenshot = await self.computer.interface.screenshot()
screenshot = await self.resize_screenshot_if_needed(screenshot)
return ToolResult(
output=f"Performed left click at ({x}, {y})",
base64_image=base64.b64encode(screenshot).decode(),
)
elif action == "right_click" and coordinate:
x, y = coordinate
self.logger.info(f"Right clicking at ({x}, {y})")
await self.computer.interface.move_cursor(x, y)
await self.computer.interface.right_click()
# Take screenshot after action
screenshot = await self.computer.interface.screenshot()
screenshot = await self.resize_screenshot_if_needed(screenshot)
return ToolResult(
output=f"Performed right click at ({x}, {y})",
base64_image=base64.b64encode(screenshot).decode(),
)
elif action == "double_click" and coordinate:
x, y = coordinate
self.logger.info(f"Double clicking at ({x}, {y})")
await self.computer.interface.move_cursor(x, y)
await self.computer.interface.double_click()
# Take screenshot after action
screenshot = await self.computer.interface.screenshot()
screenshot = await self.resize_screenshot_if_needed(screenshot)
return ToolResult(
output=f"Performed double click at ({x}, {y})",
base64_image=base64.b64encode(screenshot).decode(),
)
elif action == "mouse_move" and coordinate:
x, y = coordinate
self.logger.info(f"Moving cursor to ({x}, {y})")
await self.computer.interface.move_cursor(x, y)
# Take screenshot after action
screenshot = await self.computer.interface.screenshot()
screenshot = await self.resize_screenshot_if_needed(screenshot)
return ToolResult(
output=f"Moved cursor to ({x}, {y})",
base64_image=base64.b64encode(screenshot).decode(),
)
elif action == "type" and text:
self.logger.info(f"Typing text: {text}")
await self.computer.interface.type_text(text)
# Take screenshot after action
screenshot = await self.computer.interface.screenshot()
screenshot = await self.resize_screenshot_if_needed(screenshot)
return ToolResult(
output=f"Typed text: {text}",
base64_image=base64.b64encode(screenshot).decode(),
)
elif action == "key" and text:
self.logger.info(f"Pressing key: {text}")
# Handle special key combinations
if "+" in text:
keys = text.split("+")
await self.computer.interface.hotkey(*keys)
else:
await self.computer.interface.press_key(text)
# Take screenshot after action
screenshot = await self.computer.interface.screenshot()
screenshot = await self.resize_screenshot_if_needed(screenshot)
return ToolResult(
output=f"Pressed key: {text}",
base64_image=base64.b64encode(screenshot).decode(),
)
elif action == "cursor_position":
pos = await self.computer.interface.get_cursor_position()
x, y = pos
return ToolResult(output=f"X={int(x)},Y={int(y)}")
elif action == "scroll":
if direction == "down":
self.logger.info(f"Scrolling down, amount: {amount}")
for _ in range(amount):
await self.computer.interface.hotkey("fn", "down")
else:
self.logger.info(f"Scrolling up, amount: {amount}")
for _ in range(amount):
await self.computer.interface.hotkey("fn", "up")
# Take screenshot after action
screenshot = await self.computer.interface.screenshot()
screenshot = await self.resize_screenshot_if_needed(screenshot)
return ToolResult(
output=f"Scrolled {direction} by {amount} steps",
base64_image=base64.b64encode(screenshot).decode(),
)
# Default to screenshot for unimplemented actions
self.logger.warning(f"Action {action} not fully implemented, taking screenshot")
return await self.screenshot()
except Exception as e:
self.logger.error(f"Error during computer action: {str(e)}")
return ToolResult(error=f"Failed to perform {action}: {str(e)}")

View File

@@ -1,81 +0,0 @@
"""Omni tool manager implementation."""
from typing import Dict, List, Any
from enum import Enum
from computer.computer import Computer
from ....core.tools import BaseToolManager
from ....core.tools.collection import ToolCollection
from .bash import OmniBashTool
from .computer import OmniComputerTool
class ProviderType(Enum):
"""Supported provider types."""
ANTHROPIC = "anthropic"
OPENAI = "openai"
CLAUDE = "claude" # Alias for Anthropic
GPT = "gpt" # Alias for OpenAI
class OmniToolManager(BaseToolManager):
"""Tool manager for multi-provider support."""
def __init__(self, computer: Computer):
"""Initialize Omni tool manager.
Args:
computer: Computer instance for tools
"""
super().__init__(computer)
# Initialize tools
self.computer_tool = OmniComputerTool(self.computer)
self.bash_tool = OmniBashTool(self.computer)
def _initialize_tools(self) -> ToolCollection:
"""Initialize all available tools."""
return ToolCollection(self.computer_tool, self.bash_tool)
async def _initialize_tools_specific(self) -> None:
"""Initialize provider-specific tool requirements."""
await self.computer_tool.initialize_dimensions()
def get_tool_params(self) -> List[Dict[str, Any]]:
"""Get tool parameters for API calls.
Returns:
List of tool parameters in default format
"""
if self.tools is None:
raise RuntimeError("Tools not initialized. Call initialize() first.")
return self.tools.to_params()
def get_provider_tools(self, provider: ProviderType) -> List[Dict[str, Any]]:
"""Get tools formatted for a specific provider.
Args:
provider: Provider type to format tools for
Returns:
List of tool parameters in provider-specific format
"""
if self.tools is None:
raise RuntimeError("Tools not initialized. Call initialize() first.")
# Default is the base implementation
tools = self.tools.to_params()
# Customize for each provider if needed
if provider in [ProviderType.ANTHROPIC, ProviderType.CLAUDE]:
# Format for Anthropic API
# Additional adjustments can be made here
pass
elif provider in [ProviderType.OPENAI, ProviderType.GPT]:
# Format for OpenAI API
# Future implementation
pass
return tools

View File

@@ -1,157 +0,0 @@
"""Utility functions for Omni provider."""
import base64
import io
import logging
from typing import Tuple
from PIL import Image
logger = logging.getLogger(__name__)
def compress_image_base64(
base64_str: str, max_size_bytes: int = 5 * 1024 * 1024, quality: int = 90
) -> tuple[str, str]:
"""Compress a base64 encoded image to ensure it's below a certain size.
Args:
base64_str: Base64 encoded image string (with or without data URL prefix)
max_size_bytes: Maximum size in bytes (default: 5MB)
quality: Initial JPEG quality (0-100)
Returns:
tuple[str, str]: (Compressed base64 encoded image, media_type)
"""
# Handle data URL prefix if present (e.g., "data:image/png;base64,...")
original_prefix = ""
media_type = "image/png" # Default media type
if base64_str.startswith("data:"):
parts = base64_str.split(",", 1)
if len(parts) == 2:
original_prefix = parts[0] + ","
base64_str = parts[1]
# Try to extract media type from the prefix
if "image/jpeg" in original_prefix.lower():
media_type = "image/jpeg"
elif "image/png" in original_prefix.lower():
media_type = "image/png"
# Check if the base64 string is small enough already
if len(base64_str) <= max_size_bytes:
logger.info(f"Image already within size limit: {len(base64_str)} bytes")
return original_prefix + base64_str, media_type
try:
# Decode base64
img_data = base64.b64decode(base64_str)
img_size = len(img_data)
logger.info(f"Original image size: {img_size} bytes")
# Open image
img = Image.open(io.BytesIO(img_data))
# First, try to compress as PNG (maintains transparency if present)
buffer = io.BytesIO()
img.save(buffer, format="PNG", optimize=True)
buffer.seek(0)
compressed_data = buffer.getvalue()
compressed_b64 = base64.b64encode(compressed_data).decode("utf-8")
if len(compressed_b64) <= max_size_bytes:
logger.info(f"Compressed to {len(compressed_data)} bytes as PNG")
return compressed_b64, "image/png"
# Strategy 1: Try reducing quality with JPEG format
current_quality = quality
while current_quality > 20:
buffer = io.BytesIO()
# Convert to RGB if image has alpha channel (JPEG doesn't support transparency)
if img.mode in ("RGBA", "LA") or (img.mode == "P" and "transparency" in img.info):
logger.info("Converting transparent image to RGB for JPEG compression")
rgb_img = Image.new("RGB", img.size, (255, 255, 255))
rgb_img.paste(img, mask=img.split()[3] if img.mode == "RGBA" else None)
rgb_img.save(buffer, format="JPEG", quality=current_quality, optimize=True)
else:
img.save(buffer, format="JPEG", quality=current_quality, optimize=True)
buffer.seek(0)
compressed_data = buffer.getvalue()
compressed_b64 = base64.b64encode(compressed_data).decode("utf-8")
if len(compressed_b64) <= max_size_bytes:
logger.info(
f"Compressed to {len(compressed_data)} bytes with JPEG quality {current_quality}"
)
return compressed_b64, "image/jpeg"
# Reduce quality and try again
current_quality -= 10
# Strategy 2: If quality reduction isn't enough, reduce dimensions
scale_factor = 0.8
current_img = img
while scale_factor > 0.3:
# Resize image
new_width = int(img.width * scale_factor)
new_height = int(img.height * scale_factor)
current_img = img.resize((new_width, new_height), Image.Resampling.LANCZOS)
# Try with reduced size and quality
buffer = io.BytesIO()
# Convert to RGB if necessary for JPEG
if current_img.mode in ("RGBA", "LA") or (
current_img.mode == "P" and "transparency" in current_img.info
):
rgb_img = Image.new("RGB", current_img.size, (255, 255, 255))
rgb_img.paste(
current_img, mask=current_img.split()[3] if current_img.mode == "RGBA" else None
)
rgb_img.save(buffer, format="JPEG", quality=70, optimize=True)
else:
current_img.save(buffer, format="JPEG", quality=70, optimize=True)
buffer.seek(0)
compressed_data = buffer.getvalue()
compressed_b64 = base64.b64encode(compressed_data).decode("utf-8")
if len(compressed_b64) <= max_size_bytes:
logger.info(
f"Compressed to {len(compressed_data)} bytes with scale {scale_factor} and JPEG quality 70"
)
return compressed_b64, "image/jpeg"
# Reduce scale factor and try again
scale_factor -= 0.1
# If we get here, we couldn't compress enough
logger.warning("Could not compress image below required size with quality preservation")
# Last resort: Use minimum quality and size
buffer = io.BytesIO()
smallest_img = img.resize(
(int(img.width * 0.5), int(img.height * 0.5)), Image.Resampling.LANCZOS
)
# Convert to RGB if necessary
if smallest_img.mode in ("RGBA", "LA") or (
smallest_img.mode == "P" and "transparency" in smallest_img.info
):
rgb_img = Image.new("RGB", smallest_img.size, (255, 255, 255))
rgb_img.paste(
smallest_img, mask=smallest_img.split()[3] if smallest_img.mode == "RGBA" else None
)
rgb_img.save(buffer, format="JPEG", quality=20, optimize=True)
else:
smallest_img.save(buffer, format="JPEG", quality=20, optimize=True)
buffer.seek(0)
final_data = buffer.getvalue()
final_b64 = base64.b64encode(final_data).decode("utf-8")
logger.warning(f"Final compressed size: {len(final_b64)} bytes (may still exceed limit)")
return final_b64, "image/jpeg"
except Exception as e:
logger.error(f"Error compressing image: {str(e)}")
raise

View File

@@ -1,23 +0,0 @@
"""Type definitions for the agent package."""
from .base import HostConfig, TaskResult, Annotation
from .messages import Message, Request, Response, StepMessage, DisengageMessage
from .tools import ToolInvocation, ToolInvocationState, ClientAttachment, ToolResult
__all__ = [
# Base types
"HostConfig",
"TaskResult",
"Annotation",
# Message types
"Message",
"Request",
"Response",
"StepMessage",
"DisengageMessage",
# Tool types
"ToolInvocation",
"ToolInvocationState",
"ClientAttachment",
"ToolResult",
]

View File

@@ -1,41 +0,0 @@
"""Base type definitions."""
from enum import Enum, auto
from typing import Dict, Any
from pydantic import BaseModel, ConfigDict
class HostConfig(BaseModel):
"""Host configuration."""
model_config = ConfigDict(extra="forbid")
hostname: str
port: int
@property
def address(self) -> str:
return f"{self.hostname}:{self.port}"
class TaskResult(BaseModel):
"""Result of a task execution."""
model_config = ConfigDict(extra="forbid")
result: str
vnc_password: str
class Annotation(BaseModel):
"""Annotation metadata."""
model_config = ConfigDict(extra="forbid")
id: str
vm_url: str
class AgentLoop(Enum):
"""Enumeration of available loop types."""
ANTHROPIC = auto() # Anthropic implementation
OMNI = auto() # OmniLoop implementation
# Add more loop types as needed

View File

@@ -1,36 +0,0 @@
"""Message-related type definitions."""
from typing import List, Dict, Any, Optional
from pydantic import BaseModel, ConfigDict
from .tools import ToolInvocation
class Message(BaseModel):
"""Base message type."""
model_config = ConfigDict(extra='forbid')
role: str
content: str
annotations: Optional[List[Dict[str, Any]]] = None
toolInvocations: Optional[List[ToolInvocation]] = None
data: Optional[List[Dict[str, Any]]] = None
errors: Optional[List[str]] = None
class Request(BaseModel):
"""Request type."""
model_config = ConfigDict(extra='forbid')
messages: List[Message]
selectedModel: str
class Response(BaseModel):
"""Response type."""
model_config = ConfigDict(extra='forbid')
messages: List[Message]
vm_url: str
class StepMessage(Message):
"""Message for a single step."""
pass
class DisengageMessage(BaseModel):
"""Message indicating disengagement."""
pass