Converge to tool-based

This commit is contained in:
f-trycua
2025-03-24 20:05:06 +01:00
parent 167765a4d6
commit c20b61c389
18 changed files with 1741 additions and 699 deletions

View File

@@ -3,7 +3,6 @@
import asyncio
import logging
import traceback
from pathlib import Path
import signal
from computer import Computer
@@ -51,7 +50,8 @@ async def run_agent_example():
for i, task in enumerate(tasks):
print(f"\nExecuting task {i}/{len(tasks)}: {task}")
async for result in agent.run(task):
print(result)
# print(result)
pass
print(f"\n✅ Task {i+1}/{len(tasks)} completed: {task}")

View File

@@ -12,6 +12,7 @@ from ..providers.omni.parser import OmniParser
from ..providers.omni.types import LLMProvider, LLM
from .. import AgentLoop
from .messages import StandardMessageManager, ImageRetentionConfig
from .types import AgentResponse
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@@ -64,7 +65,7 @@ class ComputerAgent:
"""
# Basic agent configuration
self.max_retries = max_retries
self.computer = computer or Computer()
self.computer = computer
self.queue = asyncio.Queue()
self.screenshot_dir = screenshot_dir
self.log_dir = log_dir
@@ -72,11 +73,6 @@ class ComputerAgent:
self._initialized = False
self._in_context = False
# Initialize the message manager for standardized message handling
self.message_manager = StandardMessageManager(
config=ImageRetentionConfig(num_images_to_keep=only_n_most_recent_images)
)
# Set logging level
logger.setLevel(verbosity)
@@ -103,7 +99,7 @@ class ComputerAgent:
)
# Ensure computer is properly cast for typing purposes
computer_instance = cast(Computer, self.computer)
computer_instance = self.computer
# Get API key from environment if not provided
actual_api_key = api_key or os.environ.get(ENV_VARS[self.provider], "")
@@ -132,6 +128,9 @@ class ComputerAgent:
parser=OmniParser(),
)
# Initialize the message manager from the loop
self.message_manager = self._loop.message_manager
logger.info(
f"ComputerAgent initialized with provider: {self.provider}, model: {actual_model_name}"
)
@@ -200,26 +199,14 @@ class ComputerAgent:
await self.computer.run()
self._initialized = True
async def _init_if_needed(self):
"""Initialize the computer interface if it hasn't been initialized yet."""
if not self.computer._initialized:
logger.info("Computer not initialized, initializing now...")
try:
# Call run directly
await self.computer.run()
logger.info("Computer interface initialized successfully")
except Exception as e:
logger.error(f"Error initializing computer interface: {str(e)}")
raise
async def run(self, task: str) -> AsyncGenerator[Dict[str, Any], None]:
async def run(self, task: str) -> AsyncGenerator[AgentResponse, None]:
"""Run a task using the computer agent.
Args:
task: Task description
Yields:
Task execution updates
Agent response format
"""
try:
logger.info(f"Running task: {task}")
@@ -237,12 +224,6 @@ class ComputerAgent:
f"Added task message. Message history now has {len(self.message_manager.messages)} messages"
)
# Log message history types to help with debugging
message_types = [
f"{i}: {msg['role']}" for i, msg in enumerate(self.message_manager.messages)
]
logger.info(f"Message history roles: {', '.join(message_types)}")
# Pass properly formatted messages to the loop
if self._loop is None:
logger.error("Loop not initialized properly")
@@ -251,27 +232,9 @@ class ComputerAgent:
# Execute the task and yield results
async for result in self._loop.run(self.message_manager.messages):
# Extract the assistant message from the result and add it to our history
assistant_response = result["response"]["choices"][0].get("message", None)
if assistant_response and assistant_response.get("role") == "assistant":
# Extract the content from the assistant response
content = assistant_response.get("content")
self.message_manager.add_assistant_message(content)
logger.info("Added assistant response to message history")
# Yield the result to the caller
yield result
# Logging the message history for debugging
logger.info(
f"Updated message history now has {len(self.message_manager.messages)} messages"
)
message_types = [
f"{i}: {msg['role']}" for i, msg in enumerate(self.message_manager.messages)
]
logger.info(f"Updated message history roles: {', '.join(message_types)}")
except Exception as e:
logger.error(f"Error in agent run method: {str(e)}")
yield {

View File

@@ -9,6 +9,8 @@ from datetime import datetime
from computer import Computer
from .experiment import ExperimentManager
from .messages import StandardMessageManager, ImageRetentionConfig
from .types import AgentResponse
logger = logging.getLogger(__name__)
@@ -65,8 +67,6 @@ class BaseLoop(ABC):
self.save_trajectory = save_trajectory
self.only_n_most_recent_images = only_n_most_recent_images
self._kwargs = kwargs
self.message_history = []
# self.tool_manager = BaseToolManager(computer)
# Initialize experiment manager
if self.save_trajectory and self.base_dir:
@@ -125,7 +125,7 @@ class BaseLoop(ABC):
raise NotImplementedError
@abstractmethod
async def run(self, messages: List[Dict[str, Any]]) -> AsyncGenerator[Dict[str, Any], None]:
async def run(self, messages: List[Dict[str, Any]]) -> AsyncGenerator[AgentResponse, None]:
"""Run the agent loop with provided messages.
This method handles the main agent loop including message processing,
@@ -135,7 +135,7 @@ class BaseLoop(ABC):
messages: List of message objects
Yields:
Dict containing response data
Agent response format
"""
raise NotImplementedError

View File

@@ -397,219 +397,3 @@ class StandardMessageManager:
result.append(openai_msg)
return result
async def create_openai_compatible_response(
self,
response: Any,
messages: List[Dict[str, Any]],
parsed_screen: Optional[ParseResult] = None,
parser: Optional[Any] = None,
model: Optional[str] = None,
) -> Dict[str, Any]:
"""Create an OpenAI computer use agent compatible response format.
Args:
response: The original API response
messages: List of messages in standard OpenAI format
parsed_screen: Optional pre-parsed screen information
parser: Optional parser instance for coordinate calculation
model: Optional model name
Returns:
A response formatted according to OpenAI's computer use agent standard
"""
from datetime import datetime
import time
# Create a unique ID for this response
response_id = f"resp_{datetime.now().strftime('%Y%m%d%H%M%S')}_{id(response)}"
reasoning_id = f"rs_{response_id}"
action_id = f"cu_{response_id}"
call_id = f"call_{response_id}"
# Extract the last assistant message
assistant_msg = None
for msg in reversed(messages):
if msg["role"] == "assistant":
assistant_msg = msg
break
if not assistant_msg:
# If no assistant message found, create a default one
assistant_msg = {"role": "assistant", "content": "No response available"}
# Initialize output array
output_items = []
# Extract reasoning and action details from the response
content = assistant_msg["content"]
reasoning_text = None
action_details = None
for item in content:
if isinstance(item, dict) and item.get("type") == "text":
try:
# Try to parse JSON from text block
text_content = item.get("text", "")
parsed_json = json.loads(text_content)
# Get reasoning text
if reasoning_text is None:
reasoning_text = parsed_json.get("Explanation", "")
# Extract action details
action = parsed_json.get("Action", "").lower()
text_input = parsed_json.get("Text", "")
value = parsed_json.get("Value", "") # Also handle Value field
box_id = parsed_json.get("Box ID") # Extract Box ID
if action in ["click", "left_click"]:
# Always calculate coordinates from Box ID for click actions
x, y = 100, 100 # Default fallback values
if parsed_screen and box_id is not None and parser is not None:
try:
box_id_int = (
box_id
if isinstance(box_id, int)
else int(str(box_id)) if str(box_id).isdigit() else None
)
if box_id_int is not None:
# Use the parser's method to calculate coordinates
x, y = await parser.calculate_click_coordinates(
box_id_int, parsed_screen
)
logger.info(
f"Extracted coordinates for Box ID {box_id_int}: ({x}, {y})"
)
except Exception as e:
logger.error(
f"Error extracting coordinates for Box ID {box_id}: {str(e)}"
)
action_details = {
"type": "click",
"button": "left",
"box_id": (
(
box_id
if isinstance(box_id, int)
else int(box_id) if str(box_id).isdigit() else None
)
if box_id is not None
else None
),
"x": x,
"y": y,
}
elif action in ["type", "type_text"] and (text_input or value):
action_details = {
"type": "type",
"text": text_input or value,
}
elif action == "hotkey" and value:
action_details = {
"type": "hotkey",
"keys": value,
}
elif action == "scroll":
# Use default coordinates for scrolling
delta_x = 0
delta_y = 0
# Try to extract scroll delta values from content if available
scroll_data = parsed_json.get("Scroll", {})
if scroll_data:
delta_x = scroll_data.get("delta_x", 0)
delta_y = scroll_data.get("delta_y", 0)
action_details = {
"type": "scroll",
"x": 100,
"y": 100,
"scroll_x": delta_x,
"scroll_y": delta_y,
}
elif action == "none":
# Handle case when action is None (task completion)
action_details = {"type": "none", "description": "Task completed"}
except json.JSONDecodeError:
# If not JSON, just use as reasoning text
if reasoning_text is None:
reasoning_text = ""
reasoning_text += item.get("text", "")
# Add reasoning item if we have text content
if reasoning_text:
output_items.append(
{
"type": "reasoning",
"id": reasoning_id,
"summary": [
{
"type": "summary_text",
"text": reasoning_text[:200], # Truncate to reasonable length
}
],
}
)
# If no action details extracted, use default
if not action_details:
action_details = {
"type": "click",
"button": "left",
"x": 100,
"y": 100,
}
# Add computer_call item
computer_call = {
"type": "computer_call",
"id": action_id,
"call_id": call_id,
"action": action_details,
"pending_safety_checks": [],
"status": "completed",
}
output_items.append(computer_call)
# Create the OpenAI-compatible response format with all expected fields
return {
"id": response_id,
"object": "response",
"created_at": int(time.time()),
"status": "completed",
"error": None,
"incomplete_details": None,
"instructions": None,
"max_output_tokens": None,
"model": model or "unknown",
"output": output_items,
"parallel_tool_calls": True,
"previous_response_id": None,
"reasoning": {"effort": "medium", "generate_summary": "concise"},
"store": True,
"temperature": 1.0,
"text": {"format": {"type": "text"}},
"tool_choice": "auto",
"tools": [
{
"type": "computer_use_preview",
"display_height": 768,
"display_width": 1024,
"environment": "mac",
}
],
"top_p": 1.0,
"truncation": "auto",
"usage": {
"input_tokens": 0, # Placeholder values
"input_tokens_details": {"cached_tokens": 0},
"output_tokens": 0, # Placeholder values
"output_tokens_details": {"reasoning_tokens": 0},
"total_tokens": 0, # Placeholder values
},
"user": None,
"metadata": {},
# Include the original response for backward compatibility
"response": {"choices": [{"message": assistant_msg, "finish_reason": "stop"}]},
}

View File

@@ -0,0 +1,35 @@
"""Core type definitions."""
from typing import Any, Dict, List, Optional, TypedDict, Union
class AgentResponse(TypedDict, total=False):
"""Agent response format."""
id: str
object: str
created_at: int
status: str
error: Optional[str]
incomplete_details: Optional[Any]
instructions: Optional[Any]
max_output_tokens: Optional[int]
model: str
output: List[Dict[str, Any]]
parallel_tool_calls: bool
previous_response_id: Optional[str]
reasoning: Dict[str, str]
store: bool
temperature: float
text: Dict[str, Dict[str, str]]
tool_choice: str
tools: List[Dict[str, Union[str, int]]]
top_p: float
truncation: str
usage: Dict[str, Any]
user: Optional[str]
metadata: Dict[str, Any]
response: Dict[str, List[Dict[str, Any]]]
# Additional fields for error responses
role: str
content: Union[str, List[Dict[str, Any]]]

View File

@@ -1,4 +1,4 @@
from typing import Any
from typing import Any, List, Dict, cast
import httpx
import asyncio
from anthropic import Anthropic, AnthropicBedrock, AnthropicVertex
@@ -80,6 +80,147 @@ class BaseAnthropicClient:
f"Failed after {self.MAX_RETRIES} retries. " f"Last error: {str(last_error)}"
)
async def run_interleaved(
self, messages: List[Dict[str, Any]], system: str, max_tokens: int = 4096
) -> Any:
"""Run the Anthropic API with the Claude model, supports interleaved tool calling.
Args:
messages: List of message objects
system: System prompt
max_tokens: Maximum tokens to generate
Returns:
API response
"""
# Add the tool_result check/fix logic here
fixed_messages = self._fix_missing_tool_results(messages)
# Get model name from concrete implementation if available
model_name = getattr(self, "model", "unknown model")
logger.info(f"Running Anthropic API call with model {model_name}")
retry_count = 0
while retry_count < self.MAX_RETRIES:
try:
# Call the Anthropic API through create_message which is implemented by subclasses
# Convert system str to the list format expected by create_message
system_list = [system]
# Convert message format if needed - concrete implementations may do further conversion
response = await self.create_message(
messages=cast(list[BetaMessageParam], fixed_messages),
system=system_list,
tools=[], # Tools are included in the messages
max_tokens=max_tokens,
betas=["tools-2023-12-13"],
)
logger.info(f"Anthropic API call successful")
return response
except Exception as e:
retry_count += 1
wait_time = self.INITIAL_RETRY_DELAY * (
2 ** (retry_count - 1)
) # Exponential backoff
logger.info(
f"Retrying request (attempt {retry_count}/{self.MAX_RETRIES}) in {wait_time:.2f} seconds after error: {str(e)}"
)
await asyncio.sleep(wait_time)
# If we get here, all retries failed
raise RuntimeError(f"Failed to call Anthropic API after {self.MAX_RETRIES} attempts")
def _fix_missing_tool_results(self, messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Check for and fix any missing tool_result blocks after tool_use blocks.
Args:
messages: List of message objects
Returns:
Fixed messages with proper tool_result blocks
"""
fixed_messages = []
pending_tool_uses = {} # Map of tool_use IDs to their details
for i, message in enumerate(messages):
# Track any tool_use blocks in this message
if message.get("role") == "assistant" and "content" in message:
content = message.get("content", [])
for block in content:
if isinstance(block, dict) and block.get("type") == "tool_use":
tool_id = block.get("id")
if tool_id:
pending_tool_uses[tool_id] = {
"name": block.get("name", ""),
"input": block.get("input", {}),
}
# Check if this message handles any pending tool_use blocks
if message.get("role") == "user" and "content" in message:
# Check for tool_result blocks in this message
content = message.get("content", [])
for block in content:
if isinstance(block, dict) and block.get("type") == "tool_result":
tool_id = block.get("tool_use_id")
if tool_id in pending_tool_uses:
# This tool_result handles a pending tool_use
pending_tool_uses.pop(tool_id)
# Add the message to our fixed list
fixed_messages.append(message)
# If this is an assistant message with tool_use blocks and there are
# pending tool uses that need to be resolved before the next assistant message
if (
i + 1 < len(messages)
and message.get("role") == "assistant"
and messages[i + 1].get("role") == "assistant"
and pending_tool_uses
):
# We need to insert a user message with tool_results for all pending tool_uses
tool_results = []
for tool_id, tool_info in pending_tool_uses.items():
tool_results.append(
{
"type": "tool_result",
"tool_use_id": tool_id,
"content": {
"type": "error",
"message": "Tool execution was skipped or failed",
},
}
)
# Insert a synthetic user message with the tool results
if tool_results:
fixed_messages.append({"role": "user", "content": tool_results})
# Clear pending tools since we've added results for them
pending_tool_uses = {}
# Check if there are any remaining pending tool_uses at the end of the conversation
if pending_tool_uses and fixed_messages and fixed_messages[-1].get("role") == "assistant":
# Add a final user message with tool results for any pending tool_uses
tool_results = []
for tool_id, tool_info in pending_tool_uses.items():
tool_results.append(
{
"type": "tool_result",
"tool_use_id": tool_id,
"content": {
"type": "error",
"message": "Tool execution was skipped or failed",
},
}
)
if tool_results:
fixed_messages.append({"role": "user", "content": tool_results})
return fixed_messages
class AnthropicDirectClient(BaseAnthropicClient):
"""Direct Anthropic API client implementation."""

View File

@@ -18,6 +18,7 @@ from computer import Computer
# Base imports
from ...core.loop import BaseLoop
from ...core.messages import StandardMessageManager, ImageRetentionConfig
from ...core.types import AgentResponse
# Anthropic provider-specific imports
from .api.client import AnthropicClientFactory, BaseAnthropicClient
@@ -25,6 +26,7 @@ from .tools.manager import ToolManager
from .prompts import SYSTEM_PROMPT
from .types import LLMProvider
from .tools import ToolResult
from .utils import to_anthropic_format, to_agent_response_format
# Import the new modules we created
from .api_handler import AnthropicAPIHandler
@@ -87,22 +89,18 @@ class AnthropicLoop(BaseLoop):
**kwargs,
)
# Initialize message manager
self.message_manager = StandardMessageManager(
config=ImageRetentionConfig(num_images_to_keep=only_n_most_recent_images)
)
# Anthropic-specific attributes
self.provider = LLMProvider.ANTHROPIC
self.client = None
self.retry_count = 0
self.tool_manager = None
self.callback_manager = None
# Initialize standard message manager with image retention config
self.message_manager = StandardMessageManager(
config=ImageRetentionConfig(
num_images_to_keep=only_n_most_recent_images, enable_caching=True
)
)
# Message history (standard OpenAI format)
self.message_history = []
self.queue = asyncio.Queue() # Initialize queue
# Initialize handlers
self.api_handler = AnthropicAPIHandler(self)
@@ -147,25 +145,18 @@ class AnthropicLoop(BaseLoop):
# MAIN LOOP - IMPLEMENTING ABSTRACT METHOD
###########################################
async def run(self, messages: List[Dict[str, Any]]) -> AsyncGenerator[Dict[str, Any], None]:
async def run(self, messages: List[Dict[str, Any]]) -> AsyncGenerator[AgentResponse, None]:
"""Run the agent loop with provided messages.
Implements abstract method from BaseLoop to handle the main agent loop
for the AnthropicLoop implementation, using async queues and callbacks.
Args:
messages: List of message objects in standard OpenAI format
Yields:
Dict containing response data
Agent response format
"""
try:
logger.info("Starting Anthropic loop run")
# Reset message history and add new messages in standard format
self.message_history = []
self.message_history.extend(messages)
# Create queue for response streaming
queue = asyncio.Queue()
@@ -178,7 +169,7 @@ class AnthropicLoop(BaseLoop):
logger.info("Client initialized successfully")
# Start loop in background task
loop_task = asyncio.create_task(self._run_loop(queue))
loop_task = asyncio.create_task(self._run_loop(queue, messages))
# Process and yield messages as they arrive
while True:
@@ -214,11 +205,12 @@ class AnthropicLoop(BaseLoop):
# AGENT LOOP IMPLEMENTATION
###########################################
async def _run_loop(self, queue: asyncio.Queue) -> None:
"""Run the agent loop with current message history.
async def _run_loop(self, queue: asyncio.Queue, messages: List[Dict[str, Any]]) -> None:
"""Run the agent loop with provided messages.
Args:
queue: Queue for response streaming
messages: List of messages in standard OpenAI format
"""
try:
while True:
@@ -226,28 +218,38 @@ class AnthropicLoop(BaseLoop):
try:
# Take screenshot - always returns raw PNG bytes
screenshot = await self.computer.interface.screenshot()
logger.info("Screenshot captured successfully")
# Convert PNG bytes to base64
base64_image = base64.b64encode(screenshot).decode("utf-8")
logger.info(f"Screenshot converted to base64 (size: {len(base64_image)} bytes)")
# Save screenshot if requested
if self.save_trajectory and self.experiment_manager:
try:
self._save_screenshot(base64_image, action_type="state")
logger.info("Screenshot saved to trajectory")
except Exception as e:
logger.error(f"Error saving screenshot: {str(e)}")
# Add screenshot to message history in OpenAI format
# Create screenshot message
screen_info_msg = {
"role": "user",
"content": [
{
"type": "image_url",
"image_url": {"url": f"data:image/png;base64,{base64_image}"},
"type": "image",
"source": {
"type": "base64",
"media_type": "image/png",
"data": base64_image,
},
}
],
}
self.message_history.append(screen_info_msg)
# Add screenshot to messages
messages.append(screen_info_msg)
logger.info("Screenshot message added to conversation")
except Exception as e:
logger.error(f"Error capturing or processing screenshot: {str(e)}")
raise
@@ -255,10 +257,8 @@ class AnthropicLoop(BaseLoop):
# Create new turn directory for this API call
self._create_turn_dir()
# Convert standard messages to Anthropic format
anthropic_messages, system_content = self.message_manager.to_anthropic_format(
self.message_history.copy()
)
# Convert standard messages to Anthropic format using utility function
anthropic_messages, system_content = to_anthropic_format(messages.copy())
# Use API handler to make API call with Anthropic format
response = await self.api_handler.make_api_call(
@@ -266,26 +266,23 @@ class AnthropicLoop(BaseLoop):
system_prompt=system_content or SYSTEM_PROMPT,
)
# Use response handler to handle the response and convert to standard format
# This adds the response to message_history
if not await self.response_handler.handle_response(response, self.message_history):
break
# Use response handler to handle the response and get new messages
new_messages, should_continue = await self.response_handler.handle_response(
response, messages
)
# Get the last assistant message and convert it to OpenAI computer use format
for msg in reversed(self.message_history):
if msg["role"] == "assistant":
# Create OpenAI-compatible response and add to queue
openai_compatible_response = (
await self.message_manager.create_openai_compatible_response(
response=response,
messages=self.message_history,
parsed_screen=None,
parser=None,
model=self.model,
)
)
await queue.put(openai_compatible_response)
break
# Add new messages to the parent's message history
messages.extend(new_messages)
openai_compatible_response = await to_agent_response_format(
response,
messages,
model=self.model,
)
await queue.put(openai_compatible_response)
if not should_continue:
break
# Signal completion
await queue.put(None)
@@ -306,35 +303,42 @@ class AnthropicLoop(BaseLoop):
###########################################
async def _handle_response(self, response: BetaMessage, messages: List[Dict[str, Any]]) -> bool:
"""Handle the Anthropic API response.
"""Handle a response from the Anthropic API.
Args:
response: API response
messages: List of messages to update in standard OpenAI format
response: The response from the Anthropic API
messages: The message history
Returns:
True if the loop should continue, False otherwise
bool: Whether to continue the conversation
"""
try:
# Convert Anthropic response to standard OpenAI format
response_blocks = self._response_to_blocks(response)
# Convert response to standard format
openai_compatible_response = await to_agent_response_format(
response,
messages,
model=self.model,
)
# Add response to standard message history
messages.append({"role": "assistant", "content": response_blocks})
# Put the response on the queue
await self.queue.put(openai_compatible_response)
if self.callback_manager is None:
raise RuntimeError(
"Callback manager not initialized. Call initialize_client() first."
)
# Handle tool use blocks and collect results
# Handle tool use blocks and collect ALL results before adding to messages
tool_result_content = []
has_tool_use = False
for content_block in response.content:
# Notify callback of content
self.callback_manager.on_content(cast(BetaContentBlockParam, content_block))
# Handle tool use - carefully check and access attributes
if hasattr(content_block, "type") and content_block.type == "tool_use":
has_tool_use = True
if self.tool_manager is None:
raise RuntimeError(
"Tool manager not initialized. Call initialize_client() first."
@@ -357,16 +361,38 @@ class AnthropicLoop(BaseLoop):
# Notify callback of tool result
self.callback_manager.on_tool_result(cast(ToolResult, result), tool_id)
# If no tool results, we're done
if not tool_result_content:
# Signal completion
# If we had any tool_use blocks, we MUST add the tool_result message
# even if there were errors or no actual results
if has_tool_use:
# If somehow we have no tool results but had tool uses, add synthetic error results
if not tool_result_content:
logger.warning(
"Had tool uses but no tool results, adding synthetic error results"
)
for content_block in response.content:
if hasattr(content_block, "type") and content_block.type == "tool_use":
tool_id = getattr(content_block, "id", "")
if tool_id:
tool_result_content.append(
{
"type": "tool_result",
"tool_use_id": tool_id,
"content": {
"type": "error",
"text": "Tool execution was skipped or failed",
},
"is_error": True,
}
)
# Add ALL tool results as a SINGLE user message
messages.append({"role": "user", "content": tool_result_content})
return True
else:
# No tool uses, we're done
self.callback_manager.on_content({"type": "text", "text": "<DONE>"})
return False
# Add tool results to message history in standard format
messages.append({"role": "user", "content": tool_result_content})
return True
except Exception as e:
logger.error(f"Error handling response: {str(e)}")
messages.append(

View File

@@ -28,17 +28,23 @@ class AnthropicResponseHandler:
"""
self.loop = loop
async def handle_response(self, response: BetaMessage, messages: List[Dict[str, Any]]) -> bool:
async def handle_response(
self, response: BetaMessage, messages: List[Dict[str, Any]]
) -> Tuple[List[Dict[str, Any]], bool]:
"""Handle the Anthropic API response.
Args:
response: API response
messages: List of messages to update
messages: List of messages for context
Returns:
True if the loop should continue, False otherwise
Tuple containing:
- List of new messages to be added
- Boolean indicating if the loop should continue
"""
try:
new_messages = []
# Convert response to parameter format
response_params = self.response_to_params(response)
@@ -64,8 +70,8 @@ class AnthropicResponseHandler:
logger.info(f"Existing tool_use IDs in conversation: {existing_tool_use_ids}")
logger.info(f"New tool_use IDs in current response: {current_tool_use_ids}")
# Add response to messages
messages.append(
# Create assistant message
new_messages.append(
{
"role": "assistant",
"content": response_params,
@@ -116,21 +122,21 @@ class AnthropicResponseHandler:
if not tool_result_content:
# Signal completion
self.loop.callback_manager.on_content({"type": "text", "text": "<DONE>"})
return False
return new_messages, False
# Add tool results to message history
messages.append({"content": tool_result_content, "role": "user"})
return True
# Add tool results as user message
new_messages.append({"content": tool_result_content, "role": "user"})
return new_messages, True
except Exception as e:
logger.error(f"Error handling response: {str(e)}")
messages.append(
new_messages.append(
{
"role": "assistant",
"content": f"Error: {str(e)}",
}
)
return False
return new_messages, False
def response_to_params(
self,

View File

@@ -0,0 +1,370 @@
"""Utility functions for Anthropic message handling."""
import time
import logging
import re
from typing import Any, Dict, List, Optional, Tuple, cast
from anthropic.types.beta import BetaMessage, BetaMessageParam, BetaTextBlock
from ..omni.parser import ParseResult
from ...core.types import AgentResponse
from datetime import datetime
import json
# Configure module logger
logger = logging.getLogger(__name__)
def to_anthropic_format(
messages: List[Dict[str, Any]],
) -> Tuple[List[Dict[str, Any]], str]:
"""Convert standard OpenAI format messages to Anthropic format.
Args:
messages: List of messages in OpenAI format
Returns:
Tuple containing (anthropic_messages, system_content)
"""
result = []
system_content = ""
# Process messages in order to maintain conversation flow
previous_assistant_tool_use_ids = set() # Track tool_use_ids in the previous assistant message
for i, msg in enumerate(messages):
role = msg.get("role", "")
content = msg.get("content", "")
if role == "system":
# Collect system messages for later use
system_content += content + "\n"
continue
if role == "assistant":
# Track tool_use_ids in this assistant message for the next user message
previous_assistant_tool_use_ids = set()
if isinstance(content, list):
for item in content:
if isinstance(item, dict) and item.get("type") == "tool_use" and "id" in item:
previous_assistant_tool_use_ids.add(item["id"])
if role in ["user", "assistant"]:
anthropic_msg = {"role": role}
# Convert content based on type
if isinstance(content, str):
# Simple text content
anthropic_msg["content"] = [{"type": "text", "text": content}]
elif isinstance(content, list):
# Convert complex content
anthropic_content = []
for item in content:
item_type = item.get("type", "")
if item_type == "text":
anthropic_content.append({"type": "text", "text": item.get("text", "")})
elif item_type == "image_url":
# Convert OpenAI image format to Anthropic
image_url = item.get("image_url", {}).get("url", "")
if image_url.startswith("data:"):
# Extract base64 data and media type
match = re.match(r"data:(.+);base64,(.+)", image_url)
if match:
media_type, data = match.groups()
anthropic_content.append(
{
"type": "image",
"source": {
"type": "base64",
"media_type": media_type,
"data": data,
},
}
)
else:
# Regular URL
anthropic_content.append(
{
"type": "image",
"source": {
"type": "url",
"url": image_url,
},
}
)
elif item_type == "tool_use":
# Always include tool_use blocks
anthropic_content.append(item)
elif item_type == "tool_result":
# Check if this is a user message AND if the tool_use_id exists in the previous assistant message
tool_use_id = item.get("tool_use_id")
# Only include tool_result if it references a tool_use from the immediately preceding assistant message
if (
role == "user"
and tool_use_id
and tool_use_id in previous_assistant_tool_use_ids
):
anthropic_content.append(item)
else:
content_text = "Tool Result: "
if "content" in item:
if isinstance(item["content"], list):
for content_item in item["content"]:
if (
isinstance(content_item, dict)
and content_item.get("type") == "text"
):
content_text += content_item.get("text", "")
elif isinstance(item["content"], str):
content_text += item["content"]
anthropic_content.append({"type": "text", "text": content_text})
anthropic_msg["content"] = anthropic_content
result.append(anthropic_msg)
return result, system_content
def from_anthropic_format(messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Convert Anthropic format messages to standard OpenAI format.
Args:
messages: List of messages in Anthropic format
Returns:
List of messages in OpenAI format
"""
result = []
for msg in messages:
role = msg.get("role", "")
content = msg.get("content", [])
if role in ["user", "assistant"]:
openai_msg = {"role": role}
# Simple case: single text block
if len(content) == 1 and content[0].get("type") == "text":
openai_msg["content"] = content[0].get("text", "")
else:
# Complex case: multiple blocks or non-text
openai_content = []
for item in content:
item_type = item.get("type", "")
if item_type == "text":
openai_content.append({"type": "text", "text": item.get("text", "")})
elif item_type == "image":
# Convert Anthropic image to OpenAI format
source = item.get("source", {})
if source.get("type") == "base64":
media_type = source.get("media_type", "image/png")
data = source.get("data", "")
openai_content.append(
{
"type": "image_url",
"image_url": {"url": f"data:{media_type};base64,{data}"},
}
)
else:
# URL
openai_content.append(
{
"type": "image_url",
"image_url": {"url": source.get("url", "")},
}
)
elif item_type in ["tool_use", "tool_result"]:
# Pass through tool-related content
openai_content.append(item)
openai_msg["content"] = openai_content
result.append(openai_msg)
return result
async def to_agent_response_format(
response: BetaMessage,
messages: List[Dict[str, Any]],
parsed_screen: Optional[ParseResult] = None,
parser: Optional[Any] = None,
model: Optional[str] = None,
) -> AgentResponse:
"""Convert an Anthropic response to the standard agent response format.
Args:
response: The Anthropic API response (BetaMessage)
messages: List of messages in standard format
parsed_screen: Optional pre-parsed screen information
parser: Optional parser instance for coordinate calculation
model: Optional model name
Returns:
A response formatted according to the standard agent response format
"""
# Create unique IDs for this response
response_id = f"resp_{datetime.now().strftime('%Y%m%d%H%M%S')}_{id(response)}"
reasoning_id = f"rs_{response_id}"
action_id = f"cu_{response_id}"
call_id = f"call_{response_id}"
# Extract content and reasoning from Anthropic response
content = []
reasoning_text = None
action_details = None
for block in response.content:
if block.type == "text":
# Use the first text block as reasoning
if reasoning_text is None:
reasoning_text = block.text
content.append({"type": "text", "text": block.text})
elif block.type == "tool_use" and block.name == "computer":
try:
input_dict = cast(Dict[str, Any], block.input)
action = input_dict.get("action", "").lower()
# Extract coordinates from coordinate list if provided
coordinates = input_dict.get("coordinate", [100, 100])
x, y = coordinates if len(coordinates) == 2 else (100, 100)
if action == "screenshot":
action_details = {
"type": "screenshot",
}
elif action in ["click", "left_click", "right_click", "double_click"]:
action_details = {
"type": "click",
"button": "left" if action in ["click", "left_click"] else "right",
"double": action == "double_click",
"x": x,
"y": y,
}
elif action == "type":
action_details = {
"type": "type",
"text": input_dict.get("text", ""),
}
elif action == "key":
action_details = {
"type": "hotkey",
"keys": [input_dict.get("text", "")],
}
elif action == "scroll":
scroll_amount = input_dict.get("scroll_amount", 1)
scroll_direction = input_dict.get("scroll_direction", "down")
delta_y = scroll_amount if scroll_direction == "down" else -scroll_amount
action_details = {
"type": "scroll",
"x": x,
"y": y,
"delta_x": 0,
"delta_y": delta_y,
}
elif action == "move":
action_details = {
"type": "move",
"x": x,
"y": y,
}
except Exception as e:
logger.error(f"Error extracting action details: {str(e)}")
# Create output items with reasoning
output_items = []
if reasoning_text:
output_items.append(
{
"type": "reasoning",
"id": reasoning_id,
"summary": [
{
"type": "summary_text",
"text": reasoning_text,
}
],
}
)
# Add computer_call item with extracted or default action
computer_call = {
"type": "computer_call",
"id": action_id,
"call_id": call_id,
"action": action_details or {"type": "none", "description": "No action specified"},
"pending_safety_checks": [],
"status": "completed",
}
output_items.append(computer_call)
# Create the standard response format
standard_response = {
"id": response_id,
"object": "response",
"created_at": int(datetime.now().timestamp()),
"status": "completed",
"error": None,
"incomplete_details": None,
"instructions": None,
"max_output_tokens": None,
"model": model or "anthropic-default",
"output": output_items,
"parallel_tool_calls": True,
"previous_response_id": None,
"reasoning": {"effort": "medium", "generate_summary": "concise"},
"store": True,
"temperature": 1.0,
"text": {"format": {"type": "text"}},
"tool_choice": "auto",
"tools": [
{
"type": "computer_use_preview",
"display_height": 768,
"display_width": 1024,
"environment": "mac",
}
],
"top_p": 1.0,
"truncation": "auto",
"usage": {
"input_tokens": 0,
"input_tokens_details": {"cached_tokens": 0},
"output_tokens": 0,
"output_tokens_details": {"reasoning_tokens": 0},
"total_tokens": 0,
},
"user": None,
"metadata": {},
"response": {
"choices": [
{
"message": {
"role": "assistant",
"content": content,
"tool_calls": [],
},
"finish_reason": response.stop_reason or "stop",
}
]
},
}
# Add tool calls if present
tool_calls = []
for block in response.content:
if hasattr(block, "type") and block.type == "tool_use":
tool_calls.append(
{
"id": f"call_{block.id}",
"type": "function",
"function": {"name": block.name, "arguments": block.input},
}
)
if tool_calls:
standard_response["response"]["choices"][0]["message"]["tool_calls"] = tool_calls
return cast(AgentResponse, standard_response)

View File

@@ -1,212 +0,0 @@
"""Action execution for the Omni agent."""
import logging
from typing import Dict, Any, Tuple
import json
from .parser import ParseResult
logger = logging.getLogger(__name__)
class ActionExecutor:
"""Executes UI actions based on model instructions."""
def __init__(self, loop):
"""Initialize the action executor.
Args:
loop: Reference to the parent loop instance that provides context
"""
self.loop = loop
async def execute_action(self, content: Dict[str, Any], parsed_screen: ParseResult) -> bool:
"""Execute the action specified in the content.
Args:
content: Dictionary containing the action details
parsed_screen: Current parsed screen information
Returns:
Whether an action-specific screenshot was saved
"""
try:
action = content.get("Action", "").lower()
if not action:
return False
# Track if we saved an action-specific screenshot
action_screenshot_saved = False
try:
# Prepare kwargs based on action type
kwargs = {}
if action in ["left_click", "right_click", "double_click", "move_cursor"]:
try:
box_id = int(content["Box ID"])
logger.info(f"Processing Box ID: {box_id}")
# Calculate click coordinates using parser
x, y = await self.loop.parser.calculate_click_coordinates(
box_id, parsed_screen
)
logger.info(f"Calculated coordinates: x={x}, y={y}")
kwargs["x"] = x
kwargs["y"] = y
# Visualize action if screenshot is available
if parsed_screen.annotated_image_base64:
img_data = parsed_screen.annotated_image_base64
# Remove data URL prefix if present
if img_data.startswith("data:image"):
img_data = img_data.split(",")[1]
# Only save visualization for coordinate-based actions
self.loop.viz_helper.visualize_action(x, y, img_data)
action_screenshot_saved = True
except ValueError as e:
logger.error(f"Error processing Box ID: {str(e)}")
return False
elif action == "drag_to":
try:
box_id = int(content["Box ID"])
x, y = await self.loop.parser.calculate_click_coordinates(
box_id, parsed_screen
)
kwargs.update(
{
"x": x,
"y": y,
"button": content.get("button", "left"),
"duration": float(content.get("duration", 0.5)),
}
)
# Visualize drag destination if screenshot is available
if parsed_screen.annotated_image_base64:
img_data = parsed_screen.annotated_image_base64
# Remove data URL prefix if present
if img_data.startswith("data:image"):
img_data = img_data.split(",")[1]
# Only save visualization for coordinate-based actions
self.loop.viz_helper.visualize_action(x, y, img_data)
action_screenshot_saved = True
except ValueError as e:
logger.error(f"Error processing drag coordinates: {str(e)}")
return False
elif action == "type_text":
kwargs["text"] = content["Value"]
# For type_text, store the value in the action type
action_type = f"type_{content['Value'][:20]}" # Truncate if too long
elif action == "press_key":
kwargs["key"] = content["Value"]
action_type = f"press_{content['Value']}"
elif action == "hotkey":
if isinstance(content.get("Value"), list):
keys = content["Value"]
action_type = f"hotkey_{'_'.join(keys)}"
else:
# Simply split string format like "command+space" into a list
keys = [k.strip() for k in content["Value"].lower().split("+")]
action_type = f"hotkey_{content['Value'].replace('+', '_')}"
logger.info(f"Preparing hotkey with keys: {keys}")
# Get the method but call it with *args instead of **kwargs
method = getattr(self.loop.computer.interface, action)
await method(*keys) # Unpack the keys list as positional arguments
logger.info(f"Tool execution completed successfully: {action}")
# For hotkeys, take a screenshot after the action
try:
# Get a new screenshot after the action and save it with the action type
new_parsed_screen = await self.loop._get_parsed_screen_som(
save_screenshot=False
)
if new_parsed_screen and new_parsed_screen.annotated_image_base64:
img_data = new_parsed_screen.annotated_image_base64
# Remove data URL prefix if present
if img_data.startswith("data:image"):
img_data = img_data.split(",")[1]
# Save with action type to indicate this is a post-action screenshot
self.loop._save_screenshot(img_data, action_type=action_type)
action_screenshot_saved = True
except Exception as screenshot_error:
logger.error(
f"Error taking post-hotkey screenshot: {str(screenshot_error)}"
)
return action_screenshot_saved
elif action in ["scroll_down", "scroll_up"]:
clicks = int(content.get("amount", 1))
kwargs["clicks"] = clicks
action_type = f"scroll_{action.split('_')[1]}_{clicks}"
# Visualize scrolling if screenshot is available
if parsed_screen.annotated_image_base64:
img_data = parsed_screen.annotated_image_base64
# Remove data URL prefix if present
if img_data.startswith("data:image"):
img_data = img_data.split(",")[1]
direction = "down" if action == "scroll_down" else "up"
# For scrolling, we only save the visualization to avoid duplicate images
self.loop.viz_helper.visualize_scroll(direction, clicks, img_data)
action_screenshot_saved = True
else:
logger.warning(f"Unknown action: {action}")
return False
# Execute tool and handle result
try:
method = getattr(self.loop.computer.interface, action)
logger.info(f"Found method for action '{action}': {method}")
await method(**kwargs)
logger.info(f"Tool execution completed successfully: {action}")
# For non-coordinate based actions that don't already have visualizations,
# take a new screenshot after the action
if not action_screenshot_saved:
# Take a new screenshot
try:
# Get a new screenshot after the action and save it with the action type
new_parsed_screen = await self.loop._get_parsed_screen_som(
save_screenshot=False
)
if new_parsed_screen and new_parsed_screen.annotated_image_base64:
img_data = new_parsed_screen.annotated_image_base64
# Remove data URL prefix if present
if img_data.startswith("data:image"):
img_data = img_data.split(",")[1]
# Save with action type to indicate this is a post-action screenshot
if "action_type" in locals():
self.loop._save_screenshot(img_data, action_type=action_type)
else:
self.loop._save_screenshot(img_data, action_type=action)
# Update the action screenshot flag for this turn
action_screenshot_saved = True
except Exception as screenshot_error:
logger.error(
f"Error taking post-action screenshot: {str(screenshot_error)}"
)
except AttributeError as e:
logger.error(f"Method not found for action '{action}': {str(e)}")
return False
except Exception as tool_error:
logger.error(f"Tool execution failed: {str(tool_error)}")
return False
return action_screenshot_saved
except Exception as e:
logger.error(f"Error executing action {action}: {str(e)}")
return False
except Exception as e:
logger.error(f"Error in execute_action: {str(e)}")
return False

View File

@@ -13,13 +13,16 @@ from .parser import OmniParser, ParseResult
from ...core.loop import BaseLoop
from ...core.visualization import VisualizationHelper
from ...core.messages import StandardMessageManager, ImageRetentionConfig
from .utils import to_openai_agent_response_format
from ...core.types import AgentResponse
from computer import Computer
from .types import LLMProvider
from .clients.openai import OpenAIClient
from .clients.anthropic import AnthropicClient
from .prompts import SYSTEM_PROMPT
from .api_handler import OmniAPIHandler
from .action_executor import ActionExecutor
from .tools.manager import ToolManager
from .tools import ToolResult
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@@ -99,12 +102,42 @@ class OmniLoop(BaseLoop):
self.retry_count = 0
# Initialize handlers
self.api_handler = OmniAPIHandler(self)
self.action_executor = ActionExecutor(self)
self.viz_helper = VisualizationHelper(self)
self.api_handler = OmniAPIHandler(loop=self)
self.viz_helper = VisualizationHelper(agent=self)
# Initialize tool manager
self.tool_manager = ToolManager(computer=computer, provider=provider)
logger.info("OmniLoop initialized with StandardMessageManager")
async def initialize(self) -> None:
"""Initialize the loop by setting up tools and clients."""
# Initialize base class
await super().initialize()
# Initialize tool manager with error handling
try:
logger.info("Initializing tool manager...")
await self.tool_manager.initialize()
logger.info("Tool manager initialized successfully.")
except Exception as e:
logger.error(f"Error initializing tool manager: {str(e)}")
logger.warning("Will attempt to initialize tools on first use.")
# Initialize API clients based on provider
if self.provider == LLMProvider.ANTHROPIC:
self.client = AnthropicClient(
api_key=self.api_key,
model=self.model,
)
elif self.provider == LLMProvider.OPENAI:
self.client = OpenAIClient(
api_key=self.api_key,
model=self.model,
)
else:
raise ValueError(f"Unsupported provider: {self.provider}")
###########################################
# CLIENT INITIALIZATION - IMPLEMENTING ABSTRACT METHOD
###########################################
@@ -310,16 +343,23 @@ class OmniLoop(BaseLoop):
logger.info("Added converted assistant message")
try:
# Handle Anthropic response format
# Step 1: Normalize response to standard format based on provider
standard_content = []
raw_text = None
# Convert response to standardized content based on provider
if self.provider == LLMProvider.ANTHROPIC:
if hasattr(response, "content") and isinstance(response.content, list):
# First convert Anthropic response to standard format
standard_content = []
# Convert Anthropic response to standard format
for block in response.content:
if hasattr(block, "type"):
if block.type == "text":
standard_content.append({"type": "text", "text": block.text})
content = block.text
# Store raw text for JSON parsing
if raw_text is None:
raw_text = block.text
else:
raw_text += "\n" + block.text
else:
# Add other block types
block_dict = {}
@@ -327,165 +367,90 @@ class OmniLoop(BaseLoop):
if not key.startswith("_"):
block_dict[key] = value
standard_content.append(block_dict)
continue
# Add standard format response to messages using the message manager
add_assistant_message(standard_content)
# Now extract JSON from the content for action execution
# Try to find JSON in the text blocks
json_content = None
parsed_content = None
for block in response.content:
if hasattr(block, "type") and block.type == "text":
content = block.text
try:
# First look for JSON block
json_content = extract_data(content, "json")
parsed_content = json.loads(json_content)
logger.info("Successfully parsed JSON from code block")
break
except (json.JSONDecodeError, IndexError):
# If no JSON block, try to find JSON object in the text
try:
# Look for JSON object pattern
json_pattern = r"\{[^}]+\}"
json_match = re.search(json_pattern, content)
if json_match:
json_str = json_match.group(0)
parsed_content = json.loads(json_str)
logger.info("Successfully parsed JSON from text")
break
except json.JSONDecodeError:
continue
if parsed_content:
# Clean up Box ID format
if "Box ID" in parsed_content and isinstance(parsed_content["Box ID"], str):
parsed_content["Box ID"] = parsed_content["Box ID"].replace("Box #", "")
# Add any explanatory text as reasoning if not present
if "Explanation" not in parsed_content:
# Extract any text before the JSON as reasoning
if content:
text_before_json = content.split("{")[0].strip()
if text_before_json:
parsed_content["Explanation"] = text_before_json
# Log the parsed content for debugging
logger.info(f"Parsed content: {json.dumps(parsed_content, indent=2)}")
try:
# Execute action with current parsed screen info using the ActionExecutor
action_screenshot_saved = await self.action_executor.execute_action(
parsed_content, cast(ParseResult, parsed_screen)
)
except Exception as e:
logger.error(f"Error executing action: {str(e)}")
# Update the last assistant message with error
error_message = [
{"type": "text", "text": f"Error executing action: {str(e)}"}
]
# Replace the last assistant message with the error
self.message_manager.add_assistant_message(error_message)
return False, action_screenshot_saved
# Check if task is complete
if parsed_content.get("Action") == "None":
return False, action_screenshot_saved
return True, action_screenshot_saved
else:
logger.warning("No JSON found in response content")
return True, action_screenshot_saved
logger.warning("No text block found in Anthropic response")
return True, action_screenshot_saved
# Handle other providers' response formats
if isinstance(response, dict) and "choices" in response:
content = response["choices"][0]["message"]["content"]
else:
logger.warning("Invalid Anthropic response format")
return True, action_screenshot_saved
else:
content = response
# Assume OpenAI or compatible format
try:
raw_text = response["choices"][0]["message"]["content"]
standard_content = [{"type": "text", "text": raw_text}]
except (KeyError, TypeError, IndexError) as e:
logger.error(f"Invalid response format: {str(e)}")
return True, action_screenshot_saved
# Parse JSON content
if isinstance(content, str):
# Step 2: Add the normalized response to message history
add_assistant_message(standard_content)
# Step 3: Extract JSON from the content for action execution
parsed_content = None
# If we have raw text, try to extract JSON from it
if raw_text:
# Try different approaches to extract JSON
try:
# First try to parse the whole content as JSON
parsed_content = json.loads(content)
parsed_content = json.loads(raw_text)
logger.info("Successfully parsed whole content as JSON")
except json.JSONDecodeError:
try:
# Try to find JSON block
json_content = extract_data(content, "json")
json_content = extract_data(raw_text, "json")
parsed_content = json.loads(json_content)
logger.info("Successfully parsed JSON from code block")
except (json.JSONDecodeError, IndexError):
try:
# Look for JSON object pattern
json_pattern = r"\{[^}]+\}"
json_match = re.search(json_pattern, content)
json_match = re.search(json_pattern, raw_text)
if json_match:
json_str = json_match.group(0)
parsed_content = json.loads(json_str)
logger.info("Successfully parsed JSON from text")
else:
logger.error(f"No JSON found in content: {content}")
logger.error(f"No JSON found in content")
return True, action_screenshot_saved
except json.JSONDecodeError as e:
logger.error(f"Failed to parse JSON from text: {str(e)}")
return True, action_screenshot_saved
# Step 4: Process the parsed content if available
if parsed_content:
# Clean up Box ID format
if "Box ID" in parsed_content and isinstance(parsed_content["Box ID"], str):
parsed_content["Box ID"] = parsed_content["Box ID"].replace("Box #", "")
# Add any explanatory text as reasoning if not present
if "Explanation" not in parsed_content:
if "Explanation" not in parsed_content and raw_text:
# Extract any text before the JSON as reasoning
text_before_json = content.split("{")[0].strip()
text_before_json = raw_text.split("{")[0].strip()
if text_before_json:
parsed_content["Explanation"] = text_before_json
# Add response to messages with stringified content using our helper
add_assistant_message([{"type": "text", "text": json.dumps(parsed_content)}])
# Log the parsed content for debugging
logger.info(f"Parsed content: {json.dumps(parsed_content, indent=2)}")
# Step 5: Execute the action
try:
# Execute action with current parsed screen info using the ActionExecutor
action_screenshot_saved = await self.action_executor.execute_action(
parsed_content, cast(ParseResult, parsed_screen)
# Execute action using the common helper method
should_continue, action_screenshot_saved = (
await self._execute_action_with_tools(
parsed_content, cast(ParseResult, parsed_screen)
)
)
# Check if task is complete
if parsed_content.get("Action") == "None":
return False, action_screenshot_saved
return should_continue, action_screenshot_saved
except Exception as e:
logger.error(f"Error executing action: {str(e)}")
# Add error message using the message manager
# Update the last assistant message with error
error_message = [{"type": "text", "text": f"Error executing action: {str(e)}"}]
# Replace the last assistant message with the error
self.message_manager.add_assistant_message(error_message)
return False, action_screenshot_saved
# Check if task is complete
if parsed_content.get("Action") == "None":
return False, action_screenshot_saved
return True, action_screenshot_saved
elif isinstance(content, dict):
# Handle case where content is already a dictionary
add_assistant_message([{"type": "text", "text": json.dumps(content)}])
try:
# Execute action with current parsed screen info using the ActionExecutor
action_screenshot_saved = await self.action_executor.execute_action(
content, cast(ParseResult, parsed_screen)
)
except Exception as e:
logger.error(f"Error executing action: {str(e)}")
# Add error message using the message manager
error_message = [{"type": "text", "text": f"Error executing action: {str(e)}"}]
self.message_manager.add_assistant_message(error_message)
return False, action_screenshot_saved
# Check if task is complete
if content.get("Action") == "None":
return False, action_screenshot_saved
return True, action_screenshot_saved
return True, action_screenshot_saved
except Exception as e:
@@ -546,17 +511,14 @@ class OmniLoop(BaseLoop):
# MAIN LOOP - IMPLEMENTING ABSTRACT METHOD
###########################################
async def run(self, messages: List[Dict[str, Any]]) -> AsyncGenerator[Dict[str, Any], None]:
async def run(self, messages: List[Dict[str, Any]]) -> AsyncGenerator[AgentResponse, None]:
"""Run the agent loop with provided messages.
Implements abstract method from BaseLoop to handle the main agent loop
for the OmniLoop implementation.
Args:
messages: List of message objects in standard OpenAI format
messages: List of messages in standard OpenAI format
Yields:
Dict containing response data
Agent response format
"""
# Initialize the message manager with the provided messages
self.message_manager.messages = messages.copy()
@@ -640,15 +602,11 @@ class OmniLoop(BaseLoop):
# Update whether an action screenshot was saved this turn
action_screenshot_saved = action_screenshot_saved or new_screenshot_saved
# Create OpenAI-compatible response format - directly use StandardMessageManager method
openai_compatible_response = (
await self.message_manager.create_openai_compatible_response(
response=response,
messages=self.message_manager.messages,
parsed_screen=parsed_screen,
parser=self.parser,
model=self.model,
)
# Create OpenAI-compatible response format using utility function
openai_compatible_response = await to_openai_agent_response_format(
response=response,
messages=self.message_manager.messages,
model=self.model,
)
# Yield the response to the caller
@@ -680,3 +638,218 @@ class OmniLoop(BaseLoop):
# Create a brief delay before retrying
await asyncio.sleep(1)
async def process_model_response(self, response_text: str) -> Optional[Dict[str, Any]]:
"""Process model response to extract tool calls.
Args:
response_text: Model response text
Returns:
Extracted tool information, or None if no tool call was found
"""
try:
# Ensure tools are initialized before use
await self._ensure_tools_initialized()
# Look for tool use in the response
if "function_call" in response_text or "tool_use" in response_text:
# The extract_tool_call method should be implemented in the OmniAPIHandler
# For now, we'll just use a simple approach
# This will be replaced with the proper implementation
tool_info = None
if "function_call" in response_text:
# Extract function call params
try:
# Simple extraction - in real code this would be more robust
import json
import re
match = re.search(r'"function_call"\s*:\s*{([^}]+)}', response_text)
if match:
function_text = "{" + match.group(1) + "}"
tool_info = json.loads(function_text)
except Exception as e:
logger.error(f"Error extracting function call: {str(e)}")
if tool_info:
try:
# Execute the tool
result = await self.tool_manager.execute_tool(
name=tool_info.get("name"), tool_input=tool_info.get("arguments", {})
)
# Handle the result
return {"tool_result": result}
except Exception as e:
error_msg = (
f"Error executing tool '{tool_info.get('name', 'unknown')}': {str(e)}"
)
logger.error(error_msg)
return {"tool_result": ToolResult(error=error_msg)}
except Exception as e:
logger.error(f"Error processing tool call: {str(e)}")
return None
async def process_response_with_tools(
self, response_text: str, parsed_screen: Optional[ParseResult] = None
) -> Tuple[bool, str]:
"""Process model response and execute tools.
Args:
response_text: Model response text
parsed_screen: Current parsed screen information (optional)
Returns:
Tuple of (action_taken, observation)
"""
logger.info("Processing response with tools")
# Process the response to extract tool calls
tool_result = await self.process_model_response(response_text)
if tool_result and "tool_result" in tool_result:
# A tool was executed
result = tool_result["tool_result"]
if result.error:
return False, f"ERROR: {result.error}"
else:
return True, result.output or "Tool executed successfully"
# No action or tool call found
return False, "No action taken - no tool call detected in response"
###########################################
# UTILITY METHODS
###########################################
async def _ensure_tools_initialized(self) -> None:
"""Ensure the tool manager and tools are initialized before use."""
if not hasattr(self.tool_manager, "tools") or self.tool_manager.tools is None:
logger.info("Tools not initialized. Initializing now...")
await self.tool_manager.initialize()
logger.info("Tools initialized successfully.")
async def _execute_action_with_tools(
self, action_data: Dict[str, Any], parsed_screen: ParseResult
) -> Tuple[bool, bool]:
"""Execute an action using the tools-based approach.
Args:
action_data: Dictionary containing action details
parsed_screen: Current parsed screen information
Returns:
Tuple of (should_continue, action_screenshot_saved)
"""
action_screenshot_saved = False
action_type = None # Initialize for possible use in post-action screenshot
try:
# Extract the action
parsed_action = action_data.get("Action", "").lower()
# Only process if we have a valid action
if not parsed_action or parsed_action == "none":
return False, action_screenshot_saved
# Convert the parsed content to a format suitable for the tools system
tool_name = "computer" # Default to computer tool
tool_args = {"action": parsed_action}
# Add specific arguments based on action type
if parsed_action in ["left_click", "right_click", "double_click", "move_cursor"]:
# Calculate coordinates from Box ID using parser
try:
box_id = int(action_data["Box ID"])
x, y = await self.parser.calculate_click_coordinates(
box_id, cast(ParseResult, parsed_screen)
)
tool_args["x"] = x
tool_args["y"] = y
# Visualize action if screenshot is available
if parsed_screen and parsed_screen.annotated_image_base64:
img_data = parsed_screen.annotated_image_base64
# Remove data URL prefix if present
if img_data.startswith("data:image"):
img_data = img_data.split(",")[1]
# Save visualization for coordinate-based actions
self.viz_helper.visualize_action(x, y, img_data)
action_screenshot_saved = True
except (ValueError, KeyError) as e:
logger.error(f"Error processing Box ID: {str(e)}")
return False, action_screenshot_saved
elif parsed_action == "type_text":
tool_args["text"] = action_data.get("Value", "")
# For type_text, store the value in the action type for screenshot naming
action_type = f"type_{tool_args['text'][:20]}" # Truncate if too long
elif parsed_action == "press_key":
tool_args["key"] = action_data.get("Value", "")
action_type = f"press_{tool_args['key']}"
elif parsed_action == "hotkey":
value = action_data.get("Value", "")
if isinstance(value, list):
tool_args["keys"] = value
action_type = f"hotkey_{'_'.join(value)}"
else:
# Split string format like "command+space" into a list
keys = [k.strip() for k in value.lower().split("+")]
tool_args["keys"] = keys
action_type = f"hotkey_{value.replace('+', '_')}"
elif parsed_action in ["scroll_down", "scroll_up"]:
clicks = int(action_data.get("amount", 1))
tool_args["amount"] = clicks
action_type = f"scroll_{parsed_action.split('_')[1]}_{clicks}"
# Visualize scrolling if screenshot is available
if parsed_screen and parsed_screen.annotated_image_base64:
img_data = parsed_screen.annotated_image_base64
# Remove data URL prefix if present
if img_data.startswith("data:image"):
img_data = img_data.split(",")[1]
direction = "down" if parsed_action == "scroll_down" else "up"
# For scrolling, we save the visualization
self.viz_helper.visualize_scroll(direction, clicks, img_data)
action_screenshot_saved = True
# Ensure tools are initialized before use
await self._ensure_tools_initialized()
# Execute tool with prepared arguments
result = await self.tool_manager.execute_tool(name=tool_name, tool_input=tool_args)
# Take a new screenshot after the action if we haven't already saved one
if not action_screenshot_saved:
try:
# Get a new screenshot after the action
new_parsed_screen = await self._get_parsed_screen_som(save_screenshot=False)
if new_parsed_screen and new_parsed_screen.annotated_image_base64:
img_data = new_parsed_screen.annotated_image_base64
# Remove data URL prefix if present
if img_data.startswith("data:image"):
img_data = img_data.split(",")[1]
# Save with action type if defined, otherwise use the action name
if action_type:
self._save_screenshot(img_data, action_type=action_type)
else:
self._save_screenshot(img_data, action_type=parsed_action)
action_screenshot_saved = True
except Exception as screenshot_error:
logger.error(f"Error taking post-action screenshot: {str(screenshot_error)}")
# Continue the loop if the action is not "None"
return True, action_screenshot_saved
except Exception as e:
logger.error(f"Error executing action: {str(e)}")
# Update the last assistant message with error
error_message = [{"type": "text", "text": f"Error executing action: {str(e)}"}]
# Replace the last assistant message with the error
self.message_manager.add_assistant_message(error_message)
return False, action_screenshot_saved

View File

@@ -1 +1,30 @@
"""Omni provider tools - compatible with multiple LLM providers."""
from ....core.tools import BaseTool, ToolResult, ToolError, ToolFailure, CLIResult
from .base import BaseOmniTool
from .computer import ComputerTool
from .bash import BashTool
from .manager import ToolManager
# Re-export the tools with Omni-specific names for backward compatibility
OmniToolResult = ToolResult
OmniToolError = ToolError
OmniToolFailure = ToolFailure
OmniCLIResult = CLIResult
# We'll export specific tools once implemented
__all__ = [
"BaseTool",
"BaseOmniTool",
"ToolResult",
"ToolError",
"ToolFailure",
"CLIResult",
"OmniToolResult",
"OmniToolError",
"OmniToolFailure",
"OmniCLIResult",
"ComputerTool",
"BashTool",
"ToolManager",
]

View File

@@ -0,0 +1,29 @@
"""Omni-specific tool base classes."""
from abc import ABCMeta, abstractmethod
from typing import Any, Dict
from ....core.tools.base import BaseTool
class BaseOmniTool(BaseTool, metaclass=ABCMeta):
"""Abstract base class for Omni provider tools."""
def __init__(self):
"""Initialize the base Omni tool."""
# No specific initialization needed yet, but included for future extensibility
pass
@abstractmethod
async def __call__(self, **kwargs) -> Any:
"""Executes the tool with the given arguments."""
...
@abstractmethod
def to_params(self) -> Dict[str, Any]:
"""Convert tool to Omni provider-specific API parameters.
Returns:
Dictionary with tool parameters for the specific API
"""
raise NotImplementedError

View File

@@ -0,0 +1,74 @@
"""Bash tool for Omni provider."""
import logging
from typing import Any, Dict
from computer import Computer
from ....core.tools import ToolResult, ToolError
from .base import BaseOmniTool
logger = logging.getLogger(__name__)
class BashTool(BaseOmniTool):
"""Tool for executing bash commands."""
name = "bash"
description = "Execute bash commands on the system"
def __init__(self, computer: Computer):
"""Initialize the bash tool.
Args:
computer: Computer instance
"""
super().__init__()
self.computer = computer
def to_params(self) -> Dict[str, Any]:
"""Convert tool to API parameters.
Returns:
Dictionary with tool parameters
"""
return {
"type": "function",
"function": {
"name": self.name,
"description": self.description,
"parameters": {
"type": "object",
"properties": {
"command": {
"type": "string",
"description": "The bash command to execute",
},
},
"required": ["command"],
},
},
}
async def __call__(self, **kwargs) -> ToolResult:
"""Execute bash command.
Args:
**kwargs: Command parameters
Returns:
Tool execution result
"""
try:
command = kwargs.get("command", "")
if not command:
return ToolResult(error="No command specified")
# The true implementation would use the actual method to run terminal commands
# Since we're getting linter errors, we'll just implement a placeholder that will
# be replaced with the correct implementation when this tool is fully integrated
logger.info(f"Would execute command: {command}")
return ToolResult(output=f"Command executed (placeholder): {command}")
except Exception as e:
logger.error(f"Error in bash tool: {str(e)}")
return ToolResult(error=f"Error: {str(e)}")

View File

@@ -0,0 +1,179 @@
"""Computer tool for Omni provider."""
import logging
from typing import Any, Dict
import json
from computer import Computer
from ....core.tools import ToolResult, ToolError
from .base import BaseOmniTool
from ..parser import ParseResult
logger = logging.getLogger(__name__)
class ComputerTool(BaseOmniTool):
"""Tool for interacting with the computer UI."""
name = "computer"
description = "Interact with the computer's graphical user interface"
def __init__(self, computer: Computer):
"""Initialize the computer tool.
Args:
computer: Computer instance
"""
super().__init__()
self.computer = computer
# Default to standard screen dimensions (will be set more accurately during initialization)
self.screen_dimensions = {"width": 1440, "height": 900}
async def initialize_dimensions(self) -> None:
"""Initialize screen dimensions."""
# For now, we'll use default values
# In the future, we can implement proper screen dimension detection
logger.info(f"Using default screen dimensions: {self.screen_dimensions}")
def to_params(self) -> Dict[str, Any]:
"""Convert tool to API parameters.
Returns:
Dictionary with tool parameters
"""
return {
"type": "function",
"function": {
"name": self.name,
"description": self.description,
"parameters": {
"type": "object",
"properties": {
"action": {
"type": "string",
"enum": [
"left_click",
"right_click",
"double_click",
"move_cursor",
"drag_to",
"type_text",
"press_key",
"hotkey",
"scroll_up",
"scroll_down",
],
"description": "The action to perform",
},
"x": {
"type": "number",
"description": "X coordinate for click or cursor movement",
},
"y": {
"type": "number",
"description": "Y coordinate for click or cursor movement",
},
"box_id": {
"type": "integer",
"description": "ID of the UI element to interact with",
},
"text": {
"type": "string",
"description": "Text to type",
},
"key": {
"type": "string",
"description": "Key to press",
},
"keys": {
"type": "array",
"items": {"type": "string"},
"description": "Keys to press as hotkey combination",
},
"amount": {
"type": "integer",
"description": "Amount to scroll",
},
"duration": {
"type": "number",
"description": "Duration for drag operations",
},
},
"required": ["action"],
},
},
}
async def __call__(self, **kwargs) -> ToolResult:
"""Execute computer action.
Args:
**kwargs: Action parameters
Returns:
Tool execution result
"""
try:
action = kwargs.get("action", "").lower()
if not action:
return ToolResult(error="No action specified")
# Execute the action on the computer
method = getattr(self.computer.interface, action, None)
if not method:
return ToolResult(error=f"Unsupported action: {action}")
# Prepare arguments based on action type
args = {}
if action in ["left_click", "right_click", "double_click", "move_cursor"]:
x = kwargs.get("x")
y = kwargs.get("y")
if x is None or y is None:
box_id = kwargs.get("box_id")
if box_id is None:
return ToolResult(error="Box ID or coordinates required")
# Get coordinates from box_id implementation would be here
# For now, return error
return ToolResult(error="Box ID-based clicking not implemented yet")
args["x"] = x
args["y"] = y
elif action == "drag_to":
x = kwargs.get("x")
y = kwargs.get("y")
if x is None or y is None:
return ToolResult(error="Coordinates required for drag_to")
args.update(
{
"x": x,
"y": y,
"button": kwargs.get("button", "left"),
"duration": float(kwargs.get("duration", 0.5)),
}
)
elif action == "type_text":
text = kwargs.get("text")
if not text:
return ToolResult(error="Text required for type_text")
args["text"] = text
elif action == "press_key":
key = kwargs.get("key")
if not key:
return ToolResult(error="Key required for press_key")
args["key"] = key
elif action == "hotkey":
keys = kwargs.get("keys")
if not keys:
return ToolResult(error="Keys required for hotkey")
# Call with positional arguments instead of kwargs
await method(*keys)
return ToolResult(output=f"Hotkey executed: {'+'.join(keys)}")
elif action in ["scroll_down", "scroll_up"]:
args["clicks"] = int(kwargs.get("amount", 1))
# Execute action with prepared arguments
await method(**args)
return ToolResult(output=f"Action {action} executed successfully")
except Exception as e:
logger.error(f"Error executing computer action: {str(e)}")
return ToolResult(error=f"Error: {str(e)}")

View File

@@ -0,0 +1,61 @@
"""Tool manager for the Omni provider."""
from typing import Any, Dict, List
from computer.computer import Computer
from ....core.tools import BaseToolManager, ToolResult
from ....core.tools.collection import ToolCollection
from .computer import ComputerTool
from .bash import BashTool
from ..types import LLMProvider
class ToolManager(BaseToolManager):
"""Manages Omni provider tool initialization and execution."""
def __init__(self, computer: Computer, provider: LLMProvider):
"""Initialize the tool manager.
Args:
computer: Computer instance for computer-related tools
provider: The LLM provider being used
"""
super().__init__(computer)
self.provider = provider
# Initialize Omni-specific tools
self.computer_tool = ComputerTool(self.computer)
self.bash_tool = BashTool(self.computer)
def _initialize_tools(self) -> ToolCollection:
"""Initialize all available tools."""
return ToolCollection(self.computer_tool, self.bash_tool)
async def _initialize_tools_specific(self) -> None:
"""Initialize Omni provider-specific tool requirements."""
await self.computer_tool.initialize_dimensions()
def get_tool_params(self) -> List[Dict[str, Any]]:
"""Get tool parameters for API calls.
Returns:
List of tool parameters for the current provider's API
"""
if self.tools is None:
raise RuntimeError("Tools not initialized. Call initialize() first.")
return self.tools.to_params()
async def execute_tool(self, name: str, tool_input: dict[str, Any]) -> ToolResult:
"""Execute a tool with the given input.
Args:
name: Name of the tool to execute
tool_input: Input parameters for the tool
Returns:
Result of the tool execution
"""
if self.tools is None:
raise RuntimeError("Tools not initialized. Call initialize() first.")
return await self.tools.run(name=name, tool_input=tool_input)

View File

@@ -0,0 +1,236 @@
"""Main entry point for computer agents."""
import asyncio
import json
import logging
import os
from typing import Any, Dict, List, Optional
from som.models import ParseResult
from ...core.types import AgentResponse
logger = logging.getLogger(__name__)
async def to_openai_agent_response_format(
response: Any,
messages: List[Dict[str, Any]],
parsed_screen: Optional[ParseResult] = None,
parser: Optional[Any] = None,
model: Optional[str] = None,
) -> AgentResponse:
"""Create an OpenAI computer use agent compatible response format.
Args:
response: The original API response
messages: List of messages in standard OpenAI format
parsed_screen: Optional pre-parsed screen information
parser: Optional parser instance for coordinate calculation
model: Optional model name
Returns:
A response formatted according to OpenAI's computer use agent standard, including:
- All standard OpenAI computer use agent fields
- Original response in response.choices[0].message
- Full message history in messages field
"""
from datetime import datetime
import time
# Create a unique ID for this response
response_id = f"resp_{datetime.now().strftime('%Y%m%d%H%M%S')}_{id(response)}"
reasoning_id = f"rs_{response_id}"
action_id = f"cu_{response_id}"
call_id = f"call_{response_id}"
# Extract the last assistant message
assistant_msg = None
for msg in reversed(messages):
if msg["role"] == "assistant":
assistant_msg = msg
break
if not assistant_msg:
# If no assistant message found, create a default one
assistant_msg = {"role": "assistant", "content": "No response available"}
# Initialize output array
output_items = []
# Extract reasoning and action details from the response
content = assistant_msg["content"]
reasoning_text = None
action_details = None
for item in content:
if isinstance(item, dict) and item.get("type") == "text":
try:
# Try to parse JSON from text block
text_content = item.get("text", "")
parsed_json = json.loads(text_content)
# Get reasoning text
if reasoning_text is None:
reasoning_text = parsed_json.get("Explanation", "")
# Extract action details
action = parsed_json.get("Action", "").lower()
text_input = parsed_json.get("Text", "")
value = parsed_json.get("Value", "") # Also handle Value field
box_id = parsed_json.get("Box ID") # Extract Box ID
if action in ["click", "left_click"]:
# Always calculate coordinates from Box ID for click actions
x, y = 100, 100 # Default fallback values
if parsed_screen and box_id is not None and parser is not None:
try:
box_id_int = (
box_id
if isinstance(box_id, int)
else int(str(box_id)) if str(box_id).isdigit() else None
)
if box_id_int is not None:
# Use the parser's method to calculate coordinates
x, y = await parser.calculate_click_coordinates(
box_id_int, parsed_screen
)
except Exception as e:
logger.error(
f"Error extracting coordinates for Box ID {box_id}: {str(e)}"
)
action_details = {
"type": "click",
"button": "left",
"box_id": (
(
box_id
if isinstance(box_id, int)
else int(box_id) if str(box_id).isdigit() else None
)
if box_id is not None
else None
),
"x": x,
"y": y,
}
elif action in ["type", "type_text"] and (text_input or value):
action_details = {
"type": "type",
"text": text_input or value,
}
elif action == "hotkey" and value:
action_details = {
"type": "hotkey",
"keys": value,
}
elif action == "scroll":
# Use default coordinates for scrolling
delta_x = 0
delta_y = 0
# Try to extract scroll delta values from content if available
scroll_data = parsed_json.get("Scroll", {})
if scroll_data:
delta_x = scroll_data.get("delta_x", 0)
delta_y = scroll_data.get("delta_y", 0)
action_details = {
"type": "scroll",
"x": 100,
"y": 100,
"scroll_x": delta_x,
"scroll_y": delta_y,
}
elif action == "none":
# Handle case when action is None (task completion)
action_details = {"type": "none", "description": "Task completed"}
except json.JSONDecodeError:
# If not JSON, just use as reasoning text
if reasoning_text is None:
reasoning_text = ""
reasoning_text += item.get("text", "")
# Add reasoning item if we have text content
if reasoning_text:
output_items.append(
{
"type": "reasoning",
"id": reasoning_id,
"summary": [
{
"type": "summary_text",
"text": reasoning_text[:200], # Truncate to reasonable length
}
],
}
)
# If no action details extracted, use default
if not action_details:
action_details = {
"type": "click",
"button": "left",
"x": 100,
"y": 100,
}
# Add computer_call item
computer_call = {
"type": "computer_call",
"id": action_id,
"call_id": call_id,
"action": action_details,
"pending_safety_checks": [],
"status": "completed",
}
output_items.append(computer_call)
# Extract user and assistant messages from the history
user_messages = []
assistant_messages = []
for msg in messages:
if msg["role"] == "user":
user_messages.append(msg)
elif msg["role"] == "assistant":
assistant_messages.append(msg)
# Create the OpenAI-compatible response format with all expected fields
return {
"id": response_id,
"object": "response",
"created_at": int(time.time()),
"status": "completed",
"error": None,
"incomplete_details": None,
"instructions": None,
"max_output_tokens": None,
"model": model or "unknown",
"output": output_items,
"parallel_tool_calls": True,
"previous_response_id": None,
"reasoning": {"effort": "medium", "generate_summary": "concise"},
"store": True,
"temperature": 1.0,
"text": {"format": {"type": "text"}},
"tool_choice": "auto",
"tools": [
{
"type": "computer_use_preview",
"display_height": 768,
"display_width": 1024,
"environment": "mac",
}
],
"top_p": 1.0,
"truncation": "auto",
"usage": {
"input_tokens": 0, # Placeholder values
"input_tokens_details": {"cached_tokens": 0},
"output_tokens": 0, # Placeholder values
"output_tokens_details": {"reasoning_tokens": 0},
"total_tokens": 0, # Placeholder values
},
"user": None,
"metadata": {},
# Include the original response for backward compatibility
"response": {"choices": [{"message": assistant_msg, "finish_reason": "stop"}]},
}

148
libs/lume/scripts/install.sh Executable file
View File

@@ -0,0 +1,148 @@
#!/bin/bash
set -e
# Lume Installer
# This script installs Lume to your system
# Define colors for output
BOLD=$(tput bold)
NORMAL=$(tput sgr0)
RED=$(tput setaf 1)
GREEN=$(tput setaf 2)
BLUE=$(tput setaf 4)
# Default installation directory
DEFAULT_INSTALL_DIR="/usr/local/bin"
INSTALL_DIR="${INSTALL_DIR:-$DEFAULT_INSTALL_DIR}"
# GitHub info
GITHUB_REPO="trycua/cua"
LATEST_RELEASE_URL="https://api.github.com/repos/$GITHUB_REPO/releases/latest"
echo "${BOLD}${BLUE}Lume Installer${NORMAL}"
echo "This script will install Lume to your system."
# Check if we're running with appropriate permissions
check_permissions() {
if [ "$INSTALL_DIR" = "$DEFAULT_INSTALL_DIR" ] && [ "$(id -u)" != "0" ]; then
echo "${RED}Error: Installing to $INSTALL_DIR requires root privileges.${NORMAL}"
echo "Please run with sudo or specify a different directory with INSTALL_DIR environment variable."
echo "Example: INSTALL_DIR=\$HOME/.local/bin $0"
exit 1
fi
}
# Detect OS and architecture
detect_platform() {
OS=$(uname -s | tr '[:upper:]' '[:lower:]')
ARCH=$(uname -m)
if [ "$OS" != "darwin" ]; then
echo "${RED}Error: Currently only macOS is supported.${NORMAL}"
exit 1
fi
if [ "$ARCH" != "arm64" ]; then
echo "${RED}Error: Lume only supports macOS on Apple Silicon (ARM64).${NORMAL}"
exit 1
fi
PLATFORM="darwin-arm64"
echo "Detected platform: ${BOLD}$PLATFORM${NORMAL}"
}
# Create temporary directory
create_temp_dir() {
TEMP_DIR=$(mktemp -d)
echo "Using temporary directory: $TEMP_DIR"
# Make sure we clean up on exit
trap 'rm -rf "$TEMP_DIR"' EXIT
}
# Download the latest release
download_release() {
echo "Downloading latest Lume release..."
# Use the direct download link with the non-versioned symlink
DOWNLOAD_URL="https://github.com/$GITHUB_REPO/releases/latest/download/lume.tar.gz"
echo "Downloading from: $DOWNLOAD_URL"
# Download the tarball
if command -v curl &> /dev/null; then
curl -L --progress-bar "$DOWNLOAD_URL" -o "$TEMP_DIR/lume.tar.gz"
# Verify the download was successful
if [ ! -s "$TEMP_DIR/lume.tar.gz" ]; then
echo "${RED}Error: Failed to download Lume.${NORMAL}"
echo "The download URL may be incorrect or the file may not exist."
exit 1
fi
# Verify the file is a valid archive
if ! tar -tzf "$TEMP_DIR/lume.tar.gz" > /dev/null 2>&1; then
echo "${RED}Error: The downloaded file is not a valid tar.gz archive.${NORMAL}"
echo "Let's try the alternative URL..."
# Try alternative URL
ALT_DOWNLOAD_URL="https://github.com/$GITHUB_REPO/releases/latest/download/lume-$PLATFORM.tar.gz"
echo "Downloading from alternative URL: $ALT_DOWNLOAD_URL"
curl -L --progress-bar "$ALT_DOWNLOAD_URL" -o "$TEMP_DIR/lume.tar.gz"
# Check again
if ! tar -tzf "$TEMP_DIR/lume.tar.gz" > /dev/null 2>&1; then
echo "${RED}Error: Could not download a valid Lume archive.${NORMAL}"
echo "Please try installing Lume manually from: https://github.com/$GITHUB_REPO/releases/latest"
exit 1
fi
fi
else
echo "${RED}Error: curl is required but not installed.${NORMAL}"
exit 1
fi
}
# Extract and install
install_binary() {
echo "Extracting archive..."
tar -xzf "$TEMP_DIR/lume.tar.gz" -C "$TEMP_DIR"
echo "Installing to $INSTALL_DIR..."
# Create install directory if it doesn't exist
mkdir -p "$INSTALL_DIR"
# Move the binary to the installation directory
mv "$TEMP_DIR/lume" "$INSTALL_DIR/"
# Make the binary executable
chmod +x "$INSTALL_DIR/lume"
echo "${GREEN}Installation complete!${NORMAL}"
echo "Lume has been installed to ${BOLD}$INSTALL_DIR/lume${NORMAL}"
# Check if the installation directory is in PATH
if [ -n "${PATH##*$INSTALL_DIR*}" ]; then
echo "${RED}Warning: $INSTALL_DIR is not in your PATH.${NORMAL}"
echo "You may need to add it to your shell profile:"
echo " For bash: echo 'export PATH=\"\$PATH:$INSTALL_DIR\"' >> ~/.bash_profile"
echo " For zsh: echo 'export PATH=\"\$PATH:$INSTALL_DIR\"' >> ~/.zshrc"
echo " For fish: echo 'fish_add_path $INSTALL_DIR' >> ~/.config/fish/config.fish"
fi
}
# Main installation flow
main() {
check_permissions
detect_platform
create_temp_dir
download_release
install_binary
echo ""
echo "${GREEN}${BOLD}Lume has been successfully installed!${NORMAL}"
echo "Run ${BOLD}lume${NORMAL} to get started."
}
# Run the installation
main