Merge pull request #637 from trycua/feat/fara-browser-use

Add tool decorator system, browser-use tool, and fixes for Fara-7B
This commit is contained in:
ddupont
2025-12-17 14:38:55 -08:00
committed by GitHub
14 changed files with 2186 additions and 177 deletions

View File

@@ -0,0 +1,97 @@
"""
Browser Agent Example
Demonstrates how to use the ComputerAgent with BrowserTool to control a browser
programmatically via the computer server using Azure AI model (Fara-7B).
Prerequisites:
- Computer server running (Docker container or local)
- For Docker: Container should be running with browser tool support
- For local: Playwright and Firefox must be installed
Usage:
python examples/browser_agent_example.py
"""
import asyncio
import logging
import signal
import sys
import traceback
from pathlib import Path
# Add the libs path to sys.path
libs_path = Path(__file__).parent.parent / "libs" / "python"
sys.path.insert(0, str(libs_path))
from agent import ComputerAgent
from agent.tools import BrowserTool
from computer import Computer
# Import utility functions
from utils import handle_sigint, load_dotenv_files
# Configure logging to see what's happening
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
async def run_browser_agent_example():
"""Run example of using ComputerAgent with BrowserTool."""
print("\n=== Example: ComputerAgent with BrowserTool (Azure AI - Fara-7B) ===")
try:
# Initialize the computer interface
# For local testing, use provider_type="docker"
# For provider_type="cloud", provide name and api_key
computer = Computer(
provider_type="docker",
os_type="linux",
image="cua-xfce:dev",
# verbosity=logging.DEBUG,
)
await computer.run()
# Initialize the browser tool with the computer interface
browser = BrowserTool(interface=computer)
# Create ComputerAgent with BrowserTool
agent = ComputerAgent(
model="azure_ml/Fara-7B",
tools=[browser],
api_base="https://foundry-inference-gxttr.centralus.inference.ml.azure.com",
api_key="YOUR API KEY HERE",
only_n_most_recent_images=3,
# verbosity=logging.DEBUG,
trajectory_dir="trajectories",
max_trajectory_budget=1.0,
)
# Open playground UI
agent.open()
# Wait until key press
input("Press enter to exit")
except Exception as e:
logger.error(f"Error in run_browser_agent_example: {e}")
traceback.print_exc()
raise
def main():
"""Run the browser agent example."""
try:
load_dotenv_files()
# Register signal handler for graceful exit
signal.signal(signal.SIGINT, handle_sigint)
asyncio.run(run_browser_agent_example())
except Exception as e:
print(f"Error running example: {e}")
traceback.print_exc()
if __name__ == "__main__":
main()

View File

@@ -2,12 +2,14 @@
Adapters package for agent - Custom LLM adapters for LiteLLM
"""
from .azure_ml_adapter import AzureMLAdapter
from .cua_adapter import CUAAdapter
from .huggingfacelocal_adapter import HuggingFaceLocalAdapter
from .human_adapter import HumanAdapter
from .mlxvlm_adapter import MLXVLMAdapter
__all__ = [
"AzureMLAdapter",
"HuggingFaceLocalAdapter",
"HumanAdapter",
"MLXVLMAdapter",

View File

@@ -0,0 +1,283 @@
"""
Azure ML Custom Provider Adapter for LiteLLM.
This adapter provides direct OpenAI-compatible API access to Azure ML endpoints
without message transformation, specifically for models like Fara-7B that require
exact OpenAI message formatting.
"""
import json
from typing import Any, AsyncIterator, Dict, Iterator, List, Optional
import httpx
from litellm import acompletion, completion
from litellm.llms.custom_llm import CustomLLM
from litellm.types.utils import GenericStreamingChunk, ModelResponse
class AzureMLAdapter(CustomLLM):
"""
Azure ML Adapter for OpenAI-compatible endpoints.
Makes direct HTTP calls to Azure ML foundry inference endpoints
using the OpenAI-compatible API format without transforming messages.
Usage:
model = "azure_ml/Fara-7B"
api_base = "https://foundry-inference-xxx.centralus.inference.ml.azure.com"
api_key = "your-api-key"
response = litellm.completion(
model=model,
messages=[...],
api_base=api_base,
api_key=api_key
)
"""
def __init__(self, **kwargs):
"""Initialize the adapter."""
super().__init__()
self._client: Optional[httpx.Client] = None
self._async_client: Optional[httpx.AsyncClient] = None
def _get_client(self) -> httpx.Client:
"""Get or create sync HTTP client."""
if self._client is None:
self._client = httpx.Client(timeout=600.0)
return self._client
def _get_async_client(self) -> httpx.AsyncClient:
"""Get or create async HTTP client."""
if self._async_client is None:
self._async_client = httpx.AsyncClient(timeout=600.0)
return self._async_client
def _prepare_request(self, **kwargs) -> tuple[str, dict, dict]:
"""
Prepare the HTTP request without transforming messages.
Applies Azure ML workaround: double-encodes function arguments to work around
Azure ML's bug where it parses arguments before validation.
Returns:
Tuple of (url, headers, json_data)
"""
# Extract required params
api_base = kwargs.get("api_base")
api_key = kwargs.get("api_key")
model = kwargs.get("model", "").replace("azure_ml/", "")
messages = kwargs.get("messages", [])
if not api_base:
raise ValueError("api_base is required for azure_ml provider")
if not api_key:
raise ValueError("api_key is required for azure_ml provider")
# Build OpenAI-compatible endpoint URL
base_url = api_base.rstrip("/")
url = f"{base_url}/chat/completions"
# Prepare headers
headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"}
# WORKAROUND for Azure ML bug:
# Azure ML incorrectly parses the arguments field before validation,
# causing it to reject valid JSON strings. We double-encode arguments
# so that after Azure ML's parse, they remain as strings.
messages_copy = []
for message in messages:
msg_copy = message.copy()
# Check if message has tool_calls that need double-encoding
if "tool_calls" in msg_copy:
tool_calls_copy = []
for tool_call in msg_copy["tool_calls"]:
tc_copy = tool_call.copy()
if "function" in tc_copy and "arguments" in tc_copy["function"]:
func_copy = tc_copy["function"].copy()
arguments = func_copy["arguments"]
# If arguments is already a string, double-encode it
if isinstance(arguments, str):
func_copy["arguments"] = json.dumps(arguments)
tc_copy["function"] = func_copy
tool_calls_copy.append(tc_copy)
msg_copy["tool_calls"] = tool_calls_copy
messages_copy.append(msg_copy)
# Prepare request body with double-encoded messages
json_data = {"model": model, "messages": messages_copy}
# Add optional parameters if provided
optional_params = [
"temperature",
"top_p",
"n",
"stream",
"stop",
"max_tokens",
"presence_penalty",
"frequency_penalty",
"logit_bias",
"user",
"response_format",
"seed",
"tools",
"tool_choice",
]
for param in optional_params:
if param in kwargs and kwargs[param] is not None:
json_data[param] = kwargs[param]
return url, headers, json_data
def completion(self, *args, **kwargs) -> ModelResponse:
"""
Synchronous completion method.
Makes a direct HTTP POST to Azure ML's OpenAI-compatible endpoint.
"""
url, headers, json_data = self._prepare_request(**kwargs)
client = self._get_client()
response = client.post(url, headers=headers, json=json_data)
response.raise_for_status()
# Parse response
response_json = response.json()
# Return using litellm's completion with the actual response
return completion(
model=f"azure_ml/{kwargs.get('model', '')}",
mock_response=response_json["choices"][0]["message"]["content"],
messages=kwargs.get("messages", []),
)
async def acompletion(self, *args, **kwargs) -> ModelResponse:
"""
Asynchronous completion method.
Makes a direct async HTTP POST to Azure ML's OpenAI-compatible endpoint.
"""
url, headers, json_data = self._prepare_request(**kwargs)
client = self._get_async_client()
response = await client.post(url, headers=headers, json=json_data)
response.raise_for_status()
# Parse response
response_json = response.json()
# Return using litellm's acompletion with the actual response
return await acompletion(
model=f"azure_ml/{kwargs.get('model', '')}",
mock_response=response_json["choices"][0]["message"]["content"],
messages=kwargs.get("messages", []),
)
def streaming(self, *args, **kwargs) -> Iterator[GenericStreamingChunk]:
"""
Synchronous streaming method.
Makes a streaming HTTP POST to Azure ML's OpenAI-compatible endpoint.
"""
url, headers, json_data = self._prepare_request(**kwargs)
json_data["stream"] = True
client = self._get_client()
with client.stream("POST", url, headers=headers, json=json_data) as response:
response.raise_for_status()
for line in response.iter_lines():
if line.startswith("data: "):
data = line[6:] # Remove "data: " prefix
if data == "[DONE]":
break
try:
chunk_json = json.loads(data)
delta = chunk_json["choices"][0].get("delta", {})
content = delta.get("content", "")
finish_reason = chunk_json["choices"][0].get("finish_reason")
generic_streaming_chunk: GenericStreamingChunk = {
"finish_reason": finish_reason,
"index": 0,
"is_finished": finish_reason is not None,
"text": content,
"tool_use": None,
"usage": chunk_json.get(
"usage",
{"completion_tokens": 0, "prompt_tokens": 0, "total_tokens": 0},
),
}
yield generic_streaming_chunk
except json.JSONDecodeError:
continue
async def astreaming(self, *args, **kwargs) -> AsyncIterator[GenericStreamingChunk]:
"""
Asynchronous streaming method.
Makes an async streaming HTTP POST to Azure ML's OpenAI-compatible endpoint.
"""
url, headers, json_data = self._prepare_request(**kwargs)
json_data["stream"] = True
client = self._get_async_client()
async with client.stream("POST", url, headers=headers, json=json_data) as response:
response.raise_for_status()
async for line in response.aiter_lines():
if line.startswith("data: "):
data = line[6:] # Remove "data: " prefix
if data == "[DONE]":
break
try:
chunk_json = json.loads(data)
delta = chunk_json["choices"][0].get("delta", {})
content = delta.get("content", "")
finish_reason = chunk_json["choices"][0].get("finish_reason")
generic_streaming_chunk: GenericStreamingChunk = {
"finish_reason": finish_reason,
"index": 0,
"is_finished": finish_reason is not None,
"text": content,
"tool_use": None,
"usage": chunk_json.get(
"usage",
{"completion_tokens": 0, "prompt_tokens": 0, "total_tokens": 0},
),
}
yield generic_streaming_chunk
except json.JSONDecodeError:
continue
def __del__(self):
"""Cleanup HTTP clients."""
if self._client is not None:
self._client.close()
if self._async_client is not None:
import asyncio
try:
loop = asyncio.get_event_loop()
if loop.is_running():
loop.create_task(self._async_client.aclose())
else:
loop.run_until_complete(self._async_client.aclose())
except Exception:
pass

View File

@@ -23,7 +23,13 @@ import litellm
import litellm.utils
from litellm.responses.utils import Usage
from .adapters import CUAAdapter, HuggingFaceLocalAdapter, HumanAdapter, MLXVLMAdapter
from .adapters import (
AzureMLAdapter,
CUAAdapter,
HuggingFaceLocalAdapter,
HumanAdapter,
MLXVLMAdapter,
)
from .callbacks import (
BudgetManagerCallback,
ImageRetentionCallback,
@@ -39,6 +45,7 @@ from .responses import (
make_tool_error_item,
replace_failed_computer_calls_with_function_calls,
)
from .tools.base import BaseComputerTool, BaseTool
from .types import AgentCapability, IllegalArgumentError, Messages, ToolError
@@ -268,11 +275,13 @@ class ComputerAgent:
human_adapter = HumanAdapter()
mlx_adapter = MLXVLMAdapter()
cua_adapter = CUAAdapter()
azure_ml_adapter = AzureMLAdapter()
litellm.custom_provider_map = [
{"provider": "huggingface-local", "custom_handler": hf_adapter},
{"provider": "human", "custom_handler": human_adapter},
{"provider": "mlx", "custom_handler": mlx_adapter},
{"provider": "cua", "custom_handler": cua_adapter},
{"provider": "azure_ml", "custom_handler": azure_ml_adapter},
]
litellm.suppress_debug_info = True
@@ -308,10 +317,20 @@ class ComputerAgent:
# Find computer tool and create interface adapter
computer_handler = None
for schema in self.tool_schemas:
if schema["type"] == "computer":
computer_handler = await make_computer_handler(schema["computer"])
# First check if any tool is a BaseComputerTool instance
for tool in self.tools:
if isinstance(tool, BaseComputerTool):
computer_handler = tool
break
# If no BaseComputerTool found, look for traditional computer objects
if computer_handler is None:
for schema in self.tool_schemas:
if schema["type"] == "computer":
computer_handler = await make_computer_handler(schema["computer"])
break
self.computer_handler = computer_handler
def _process_input(self, input: Messages) -> List[Dict[str, Any]]:
@@ -329,6 +348,14 @@ class ComputerAgent:
if is_agent_computer(tool):
# This is a computer tool - will be handled by agent loop
schemas.append({"type": "computer", "computer": tool})
elif isinstance(tool, BaseTool):
# BaseTool instance - extract schema from its properties
function_schema = {
"name": tool.name,
"description": tool.description,
"parameters": tool.parameters,
}
schemas.append({"type": "function", "function": function_schema})
elif callable(tool):
# Use litellm.utils.function_to_dict to extract schema from docstring
try:
@@ -341,10 +368,14 @@ class ComputerAgent:
return schemas
def _get_tool(self, name: str) -> Optional[Callable]:
def _get_tool(self, name: str) -> Optional[Union[Callable, BaseTool]]:
"""Get a tool by name"""
for tool in self.tools:
if hasattr(tool, "__name__") and tool.__name__ == name:
# Check if it's a BaseTool instance
if isinstance(tool, BaseTool) and tool.name == name:
return tool
# Check if it's a regular callable
elif hasattr(tool, "__name__") and tool.__name__ == name:
return tool
elif hasattr(tool, "func") and tool.func.__name__ == name:
return tool
@@ -568,14 +599,19 @@ class ComputerAgent:
args = json.loads(item.get("arguments"))
# Validate arguments before execution
assert_callable_with(function, **args)
# Execute function - use asyncio.to_thread for non-async functions
if inspect.iscoroutinefunction(function):
result = await function(**args)
# Handle BaseTool instances
if isinstance(function, BaseTool):
# BaseTool.call() handles its own execution
result = function.call(args)
else:
result = await asyncio.to_thread(function, **args)
# Validate arguments before execution for regular callables
assert_callable_with(function, **args)
# Execute function - use asyncio.to_thread for non-async functions
if inspect.iscoroutinefunction(function):
result = await function(**args)
else:
result = await asyncio.to_thread(function, **args)
# Create function call output
call_output = {
@@ -767,3 +803,22 @@ class ComputerAgent:
if hasattr(self.agent_loop, "get_capabilities"):
return self.agent_loop.get_capabilities()
return ["step"] # Default capability
def open(self, port: Optional[int] = None):
"""
Start the playground server and open it in the browser.
This method starts a local HTTP server that exposes the /responses endpoint
and automatically opens the CUA playground interface in the default browser.
Args:
port: Port to run the server on. If None, finds an available port automatically.
Example:
>>> agent = ComputerAgent(model="claude-sonnet-4")
>>> agent.open() # Starts server and opens browser
"""
from .playground import PlaygroundServer
server = PlaygroundServer(agent_instance=self)
server.start(port=port, open_browser=True)

View File

@@ -6,6 +6,7 @@ Agent loops for agent
from . import (
anthropic,
composed_grounded,
fara,
gelato,
gemini,
generic_vlm,
@@ -28,6 +29,7 @@ __all__ = [
"gelato",
"gemini",
"generic_vlm",
"fara",
"glm45v",
"gta1",
"holo",

View File

@@ -0,0 +1,586 @@
"""
Qwen3-VL agent loop implementation using litellm with function/tool calling.
- Passes a ComputerUse tool schema to acompletion
- Converts between Responses items and completion messages using helpers
"""
from __future__ import annotations
import json
import re
from typing import Any, Dict, List, Optional, Tuple
import litellm
from litellm.responses.litellm_completion_transformation.transformation import (
LiteLLMCompletionResponsesConfig,
)
from ..decorators import register_agent
from ..loops.base import AsyncAgentConfig
from ..responses import (
convert_completion_messages_to_responses_items,
convert_responses_items_to_completion_messages,
make_reasoning_item,
)
from ..types import AgentCapability
# ComputerUse tool schema (OpenAI function tool format)
QWEN3_COMPUTER_TOOL: Dict[str, Any] = {
"type": "function",
"function": {
"name": "computer",
"description": (
"Use a mouse and keyboard to interact with a computer, and take screenshots.\n"
"* This is an interface to a desktop GUI. You do not have access to a terminal or applications menu. You must click on desktop icons to start applications.\n"
"* Some applications may take time to start or process actions, so you may need to wait and take successive screenshots to see the results of your actions. E.g. if you click on Firefox and a window doesn't open, try wait and taking another screenshot.\n"
"* The screen's resolution is 1000x1000.\n"
"* Whenever you intend to move the cursor to click on an element like an icon, you should consult a screenshot to determine the coordinates of the element before moving the cursor.\n"
"* If you tried clicking on a program or link but it failed to load, even after waiting, try adjusting your cursor position so that the tip of the cursor visually falls on the element that you want to click.\n"
"* Make sure to click any buttons, links, icons, etc with the cursor tip in the center of the element. Don't click boxes on their edges."
),
"parameters": {
"type": "object",
"properties": {
"action": {
"description": "The action to perform.",
"enum": [
"key",
"type",
"mouse_move",
"left_click",
"left_click_drag",
"right_click",
"middle_click",
"double_click",
"triple_click",
"scroll",
"hscroll",
"screenshot",
"wait",
# "terminate",
# "answer",
],
"type": "string",
},
"keys": {
"description": "Required only by action=key.",
"type": "array",
"items": {"type": "string"},
},
"text": {
"description": "Required only by action=type and action=answer.",
"type": "string",
},
"coordinate": {
"description": "(x, y): Pixel coordinates from top-left.",
"type": "array",
"items": {"type": ["number", "integer"]},
"minItems": 2,
"maxItems": 2,
},
"pixels": {
"description": "Scroll amount. Positive=up, negative=down. For scroll/hscroll.",
"type": "number",
},
"time": {
"description": "Seconds to wait (action=wait).",
"type": "number",
},
# "status": {
# "description": "Task status (action=terminate).",
# "type": "string",
# "enum": ["success", "failure"],
# },
},
"required": ["action"],
},
},
}
def _build_nous_system(functions: List[Dict[str, Any]]) -> Optional[Dict[str, Any]]:
"""Use qwen-agent NousFnCallPrompt to generate a system message embedding tool schema."""
try:
from qwen_agent.llm.fncall_prompts.nous_fncall_prompt import (
ContentItem as NousContentItem,
)
from qwen_agent.llm.fncall_prompts.nous_fncall_prompt import (
Message as NousMessage,
)
from qwen_agent.llm.fncall_prompts.nous_fncall_prompt import (
NousFnCallPrompt,
)
except ImportError:
raise ImportError(
"qwen-agent not installed. Please install it with `pip install cua-agent[qwen]`."
)
msgs = NousFnCallPrompt().preprocess_fncall_messages(
messages=[
NousMessage(
role="system", content=[NousContentItem(text="You are a helpful assistant.")]
)
],
functions=functions,
lang="en",
)
sys = msgs[0].model_dump()
# Convert qwen-agent structured content to OpenAI-style content list
content = [{"type": "text", "text": c["text"]} for c in sys.get("content", [])]
return {"role": "system", "content": content}
def _parse_tool_call_from_text(text: str) -> Optional[Dict[str, Any]]:
"""Extract JSON object within <tool_call>...</tool_call> from model text."""
m = re.search(r"<tool_call>\s*(\{[\s\S]*?\})\s*</tool_call>", text)
if not m:
return None
try:
return json.loads(m.group(1))
except Exception:
return None
async def _unnormalize_coordinate(args: Dict[str, Any], dims: Tuple[int, int]) -> Dict[str, Any]:
"""Coordinates appear in 0..1000 space, scale to actual screen size using dims if provided."""
coord = args.get("coordinate")
if not coord or not isinstance(coord, (list, tuple)) or len(coord) < 2:
return args
x, y = float(coord[0]), float(coord[1])
width, height = float(dims[0]), float(dims[1])
x_abs = max(0.0, min(width, (x / 1000.0) * width))
y_abs = max(0.0, min(height, (y / 1000.0) * height))
args = {**args, "coordinate": [round(x_abs), round(y_abs)]}
return args
def convert_qwen_tool_args_to_computer_action(args: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""
Convert Qwen computer tool arguments to the Computer Calls action schema.
Qwen (example):
{"action": "left_click", "coordinate": [114, 68]}
Target (example):
{"action": "left_click", "x": 114, "y": 68}
Other mappings:
- right_click, middle_click, double_click (triple_click -> double_click)
- mouse_move -> { action: "move", x, y }
- key -> { action: "keypress", keys: [...] }
- type -> { action: "type", text }
- scroll/hscroll -> { action: "scroll", scroll_x, scroll_y, x, y }
- wait -> { action: "wait" }
- terminate/answer are not direct UI actions; return None for now
"""
if not isinstance(args, dict):
return None
action = args.get("action")
if not isinstance(action, str):
return None
# Coordinates helper
coord = args.get("coordinate")
x = y = None
if isinstance(coord, (list, tuple)) and len(coord) >= 2:
try:
x = int(round(float(coord[0])))
y = int(round(float(coord[1])))
except Exception:
x = y = None
# Map actions
a = action.lower()
if a in {"left_click", "right_click", "middle_click", "double_click"}:
if x is None or y is None:
return None
return {"action": a, "x": x, "y": y}
if a == "triple_click":
# Approximate as double_click
if x is None or y is None:
return None
return {"action": "double_click", "x": x, "y": y}
if a == "mouse_move":
if x is None or y is None:
return None
return {"action": "move", "x": x, "y": y}
if a == "key":
keys = args.get("keys")
if isinstance(keys, list) and all(isinstance(k, str) for k in keys):
return {"action": "keypress", "keys": keys}
return None
if a == "type":
text = args.get("text")
if isinstance(text, str):
return {"action": "type", "text": text}
return None
if a in {"scroll", "hscroll"}:
pixels = args.get("pixels") or 0
try:
pixels_val = int(round(float(pixels)))
except Exception:
pixels_val = 0
scroll_x = pixels_val if a == "hscroll" else 0
scroll_y = pixels_val if a == "scroll" else 0
# Include cursor position if available (optional)
out: Dict[str, Any] = {"action": "scroll", "scroll_x": scroll_x, "scroll_y": scroll_y}
if x is not None and y is not None:
out.update({"x": x, "y": y})
return out
if a == "wait":
return {"action": "wait"}
# Non-UI or terminal actions: terminate/answer -> not mapped here
return None
@register_agent(models=r"(?i).*fara-7b.*")
class FaraVlmConfig(AsyncAgentConfig):
async def predict_step(
self,
messages: List[Dict[str, Any]],
model: str,
tools: Optional[List[Dict[str, Any]]] = None,
max_retries: Optional[int] = None,
stream: bool = False,
computer_handler=None,
use_prompt_caching: Optional[bool] = False,
_on_api_start=None,
_on_api_end=None,
_on_usage=None,
_on_screenshot=None,
**kwargs,
) -> Dict[str, Any]:
# Build messages using NousFnCallPrompt system with tool schema in text
# Start with converted conversation (images/text preserved)
converted_msgs = convert_responses_items_to_completion_messages(
messages, allow_images_in_tool_results=False, use_xml_tools=True
)
# Build function schemas from tools array
function_schemas = []
if tools:
from ..computers import is_agent_computer
for tool in tools:
tool_type = tool.get("type")
if tool_type == "computer":
# For computer tools, use QWEN3_COMPUTER_TOOL schema
computer = tool.get("computer")
if computer and is_agent_computer(computer):
function_schemas.append(QWEN3_COMPUTER_TOOL["function"])
elif tool_type == "function":
# For function tools, use the provided function schema
function_schema = tool.get("function")
if function_schema:
function_schemas.append(function_schema)
# If no tools provided or no computer tool found, use default QWEN3_COMPUTER_TOOL
if not function_schemas:
function_schemas = [QWEN3_COMPUTER_TOOL["function"]]
# Prepend Nous-generated system if available
nous_system = _build_nous_system(function_schemas)
completion_messages = ([nous_system] if nous_system else []) + converted_msgs
# If there is no screenshot in the conversation, take one now and inject it.
# Also record a pre_output_items assistant message to reflect action.
def _has_any_image(msgs: List[Dict[str, Any]]) -> bool:
for m in msgs:
content = m.get("content")
if isinstance(content, list):
for p in content:
if isinstance(p, dict) and p.get("type") == "image_url":
return True
return False
pre_output_items: List[Dict[str, Any]] = []
if not _has_any_image(completion_messages):
if computer_handler is None or not hasattr(computer_handler, "screenshot"):
raise RuntimeError(
"No screenshots present and computer_handler.screenshot is not available."
)
screenshot_b64 = await computer_handler.screenshot()
if not screenshot_b64:
raise RuntimeError("Failed to capture screenshot from computer_handler.")
await _on_screenshot(screenshot_b64, "screenshot_before")
# Check if computer_handler has get_current_url method
screenshot_text = "Here is the next screenshot. Think about what to do next."
if hasattr(computer_handler, "get_current_url"):
try:
current_url = await computer_handler.get_current_url()
screenshot_text = f"Current URL: {current_url[:100]}\nHere is the next screenshot. Think about what to do next."
except Exception:
# If get_current_url fails, fall back to default text
pass
else:
print(computer_handler)
print("HAS ATTR get_current_url", hasattr(computer_handler, "get_current_url"))
# Inject a user message with the screenshot so the model can see current context
completion_messages.append(
{
"role": "user",
"content": [
{
"type": "image_url",
"image_url": {"url": f"data:image/png;base64,{screenshot_b64}"},
},
{"type": "text", "text": screenshot_text},
],
}
)
# Smart-resize all screenshots and attach min/max pixel hints. Fail fast if deps missing.
# Also record the last resized width/height to unnormalize coordinates later.
last_rw: Optional[int] = None
last_rh: Optional[int] = None
MIN_PIXELS = 3136
MAX_PIXELS = 12845056
try:
import base64
import io
from PIL import Image # type: ignore
from qwen_vl_utils import smart_resize # type: ignore
except Exception:
raise ImportError(
"qwen-vl-utils not installed. Please install it with `pip install cua-agent[qwen]`."
)
for msg in completion_messages:
content = msg.get("content")
if not isinstance(content, list):
continue
for part in content:
if isinstance(part, dict) and part.get("type") == "image_url":
url = ((part.get("image_url") or {}).get("url")) or ""
# Expect data URL like data:image/png;base64,<b64>
if url.startswith("data:") and "," in url:
b64 = url.split(",", 1)[1]
img_bytes = base64.b64decode(b64)
im = Image.open(io.BytesIO(img_bytes))
h, w = im.height, im.width
rh, rw = smart_resize(
h, w, factor=28, min_pixels=MIN_PIXELS, max_pixels=MAX_PIXELS
)
# Attach hints on this image block
part["min_pixels"] = MIN_PIXELS
part["max_pixels"] = MAX_PIXELS
last_rw, last_rh = rw, rh
api_kwargs: Dict[str, Any] = {
"model": model,
"messages": completion_messages,
"max_retries": max_retries,
"stream": stream,
**{k: v for k, v in kwargs.items()},
}
if use_prompt_caching:
api_kwargs["use_prompt_caching"] = use_prompt_caching
if _on_api_start:
await _on_api_start(api_kwargs)
response = await litellm.acompletion(**api_kwargs)
if _on_api_end:
await _on_api_end(api_kwargs, response)
usage = {
**LiteLLMCompletionResponsesConfig._transform_chat_completion_usage_to_responses_usage( # type: ignore
response.usage
).model_dump(),
"response_cost": response._hidden_params.get("response_cost", 0.0),
}
if _on_usage:
await _on_usage(usage)
# Extract response data
resp_dict = response.model_dump() # type: ignore
choice = (resp_dict.get("choices") or [{}])[0]
message = choice.get("message") or {}
content_text = message.get("content") or ""
tool_calls_array = message.get("tool_calls") or []
reasoning_text = message.get("reasoning") or ""
output_items: List[Dict[str, Any]] = []
# Add reasoning if present (Ollama Cloud format)
if reasoning_text:
output_items.append(make_reasoning_item(reasoning_text))
# Priority 1: Try to parse tool call from content text (OpenRouter format)
tool_call = _parse_tool_call_from_text(content_text)
if tool_call and isinstance(tool_call, dict):
fn_name = tool_call.get("name") or "computer"
raw_args = tool_call.get("arguments") or {}
# Unnormalize coordinates to actual screen size using last resized dims
if last_rw is None or last_rh is None:
raise RuntimeError(
"No screenshots found to derive dimensions for coordinate unnormalization."
)
args = await _unnormalize_coordinate(raw_args, (last_rw, last_rh))
# Build an OpenAI-style tool call so we can reuse the converter
fake_cm = {
"role": "assistant",
"tool_calls": [
{
"type": "function",
"id": "call_0",
"function": {
"name": fn_name,
"arguments": json.dumps(args),
},
}
],
}
output_items.extend(convert_completion_messages_to_responses_items([fake_cm]))
elif tool_calls_array:
# Priority 2: Use tool_calls field if present (Ollama Cloud format)
# Process and unnormalize coordinates in tool calls
processed_tool_calls = []
for tc in tool_calls_array:
function = tc.get("function", {})
fn_name = function.get("name", "computer")
args_str = function.get("arguments", "{}")
try:
args = json.loads(args_str)
# Unnormalize coordinates if present
if "coordinate" in args and last_rw is not None and last_rh is not None:
args = await _unnormalize_coordinate(args, (last_rw, last_rh))
# Convert Qwen format to Computer Calls format if this is a computer tool
if fn_name == "computer":
converted_action = convert_qwen_tool_args_to_computer_action(args)
if converted_action:
args = converted_action
processed_tool_calls.append(
{
"type": tc.get("type", "function"),
"id": tc.get("id", "call_0"),
"function": {
"name": fn_name,
"arguments": json.dumps(args),
},
}
)
except json.JSONDecodeError:
# Keep original if parsing fails
processed_tool_calls.append(tc)
fake_cm = {
"role": "assistant",
"content": content_text if content_text else "",
"tool_calls": processed_tool_calls,
}
output_items.extend(convert_completion_messages_to_responses_items([fake_cm]))
else:
# No tool calls found in either format, return text response
fake_cm = {"role": "assistant", "content": content_text}
output_items.extend(convert_completion_messages_to_responses_items([fake_cm]))
# Prepend any pre_output_items (e.g., simulated screenshot-taking message)
return {"output": (pre_output_items + output_items), "usage": usage}
def get_capabilities(self) -> List[AgentCapability]:
return ["step"]
async def predict_click(
self, model: str, image_b64: str, instruction: str, **kwargs
) -> Optional[Tuple[int, int]]:
"""
Predict click coordinates using Qwen3-VL via litellm.acompletion.
Only exposes a reduced tool schema with left_click to bias model to output a single click.
Returns (x, y) absolute pixels when screen dimensions can be obtained; otherwise normalized 0..1000 integers.
"""
# Reduced tool
reduced_tool = {
"type": "function",
"function": {
**QWEN3_COMPUTER_TOOL["function"],
"parameters": {
"type": "object",
"properties": {
"action": {"type": "string", "enum": ["left_click"]},
"coordinate": {
"description": "(x, y) in 0..1000 reference space",
"type": "array",
"items": {"type": ["number", "integer"]},
"minItems": 2,
"maxItems": 2,
},
},
"required": ["action", "coordinate"],
},
},
}
# Build Nous system (lazy import inside helper already raises clear guidance if missing)
nous_system = _build_nous_system([reduced_tool["function"]])
# Pre-process using smart_resize
min_pixels = 3136
max_pixels = 12845056
try:
# Lazy import to avoid hard dependency
import base64
import io
# If PIL is available, estimate size from image to derive smart bounds
from PIL import Image
from qwen_vl_utils import smart_resize # type: ignore
img_bytes = base64.b64decode(image_b64)
im = Image.open(io.BytesIO(img_bytes))
h, w = im.height, im.width
rh, rw = smart_resize(h, w, factor=28, min_pixels=min_pixels, max_pixels=max_pixels)
except Exception:
raise ImportError(
"qwen-vl-utils not installed. Please install it with `pip install cua-agent[qwen]`."
)
messages = []
if nous_system:
messages.append(nous_system)
image_block: Dict[str, Any] = {
"type": "image_url",
"image_url": {"url": f"data:image/png;base64,{image_b64}"},
"min_pixels": min_pixels,
"max_pixels": max_pixels,
}
# Single user message with image and instruction, matching OpenAI-style content blocks
messages.append(
{
"role": "user",
"content": [
image_block,
{"type": "text", "text": instruction},
],
}
)
api_kwargs: Dict[str, Any] = {
"model": model,
"messages": messages,
**{k: v for k, v in kwargs.items()},
}
response = await litellm.acompletion(**api_kwargs)
resp = response.model_dump() # type: ignore
choice = (resp.get("choices") or [{}])[0]
content_text = ((choice.get("message") or {}).get("content")) or ""
tool_call = _parse_tool_call_from_text(content_text) or {}
args = tool_call.get("arguments") or {}
args = await _unnormalize_coordinate(args, (rh, rw))
coord = args.get("coordinate")
if isinstance(coord, (list, tuple)) and len(coord) >= 2:
return int(coord[0]), int(coord[1])
return None

View File

@@ -258,8 +258,31 @@ class GenericVlmConfig(AsyncAgentConfig):
allow_images_in_tool_results=False,
)
# Build function schemas from tools array
function_schemas = []
if tools:
from ..computers import is_agent_computer
for tool in tools:
tool_type = tool.get("type")
if tool_type == "computer":
# For computer tools, use QWEN3_COMPUTER_TOOL schema
computer = tool.get("computer")
if computer and is_agent_computer(computer):
function_schemas.append(QWEN3_COMPUTER_TOOL["function"])
elif tool_type == "function":
# For function tools, use the provided function schema
function_schema = tool.get("function")
if function_schema:
function_schemas.append(function_schema)
# If no tools provided or no computer tool found, use default QWEN3_COMPUTER_TOOL
if not function_schemas:
function_schemas = [QWEN3_COMPUTER_TOOL["function"]]
# Prepend Nous-generated system if available
nous_system = _build_nous_system([QWEN3_COMPUTER_TOOL["function"]])
nous_system = _build_nous_system(function_schemas)
completion_messages = ([nous_system] if nous_system else []) + converted_msgs
# If there is no screenshot in the conversation, take one now and inject it.
@@ -273,6 +296,20 @@ class GenericVlmConfig(AsyncAgentConfig):
return True
return False
def _has_screenshot_message(msgs: List[Dict[str, Any]]) -> bool:
"""Check if messages already contain the 'Taking a screenshot' text."""
screenshot_text = "Taking a screenshot to see the current computer screen."
for m in msgs:
content = m.get("content")
if isinstance(content, str) and screenshot_text in content:
return True
if isinstance(content, list):
for p in content:
if isinstance(p, dict) and p.get("type") == "text":
if screenshot_text in (p.get("text") or ""):
return True
return False
pre_output_items: List[Dict[str, Any]] = []
if not _has_any_image(completion_messages):
if computer_handler is None or not hasattr(computer_handler, "screenshot"):
@@ -295,19 +332,20 @@ class GenericVlmConfig(AsyncAgentConfig):
],
}
)
# Add assistant message to outputs to reflect the action, similar to composed_grounded.py
pre_output_items.append(
{
"type": "message",
"role": "assistant",
"content": [
{
"type": "text",
"text": "Taking a screenshot to see the current computer screen.",
}
],
}
)
# Add assistant message to outputs to reflect the action, only if not already present
if not _has_screenshot_message(messages):
pre_output_items.append(
{
"type": "message",
"role": "assistant",
"content": [
{
"type": "text",
"text": "Taking a screenshot to see the current computer screen.",
}
],
}
)
# Smart-resize all screenshots and attach min/max pixel hints. Fail fast if deps missing.
# Also record the last resized width/height to unnormalize coordinates later.

View File

@@ -0,0 +1,5 @@
"""Playground server for CUA agents."""
from .server import PlaygroundServer
__all__ = ["PlaygroundServer"]

View File

@@ -0,0 +1,301 @@
"""Playground server implementation for CUA agents."""
import asyncio
import logging
import os
import platform
import socket
import traceback
import webbrowser
from typing import Any, Dict, List, Optional, Union
from urllib.parse import quote
import uvicorn
from fastapi import FastAPI, HTTPException, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
logger = logging.getLogger(__name__)
class PlaygroundServer:
"""Playground server for running CUA agents via HTTP API."""
def __init__(self, agent_instance=None):
"""
Initialize the playground server.
Args:
agent_instance: Optional pre-configured agent instance to use
"""
self.agent_instance = agent_instance
self.app = FastAPI(
title="CUA Playground Server",
description="Playground server for CUA agents",
version="0.1.0",
)
self._setup_middleware()
self._setup_routes()
self.server = None
self.port = None
def _setup_middleware(self):
"""Setup CORS middleware."""
self.app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
def _setup_routes(self):
"""Setup API routes."""
@self.app.get("/status")
async def status():
"""Health check endpoint."""
sys = platform.system().lower()
if "darwin" in sys or sys in ("macos", "mac"):
os_type = "macos"
elif "windows" in sys:
os_type = "windows"
else:
os_type = "linux"
return {
"status": "ok",
"os_type": os_type,
"features": ["agent", "playground"],
}
@self.app.post("/responses")
async def responses_endpoint(request: Request):
"""
Run ComputerAgent for up to 2 turns.
Body JSON:
{
"model": "...", # required
"input": "... or messages[]", # required
"agent_kwargs": { ... }, # optional, passed directly to ComputerAgent
"env": { ... } # optional env overrides for agent
}
"""
# Import here to avoid circular imports
try:
from agent import ComputerAgent
except ImportError:
raise HTTPException(status_code=501, detail="ComputerAgent not available")
# Parse request body
try:
body = await request.json()
except Exception as e:
raise HTTPException(status_code=400, detail=f"Invalid JSON body: {str(e)}")
model = body.get("model")
input_data = body.get("input")
if not model or input_data is None:
raise HTTPException(status_code=400, detail="'model' and 'input' are required")
agent_kwargs: Dict[str, Any] = body.get("agent_kwargs") or {}
env_overrides: Dict[str, str] = body.get("env") or {}
# Simple env override context
class _EnvOverride:
def __init__(self, overrides: Dict[str, str]):
self.overrides = overrides
self._original: Dict[str, Optional[str]] = {}
def __enter__(self):
for k, v in (self.overrides or {}).items():
self._original[k] = os.environ.get(k)
os.environ[k] = str(v)
def __exit__(self, exc_type, exc, tb):
for k, old in self._original.items():
if old is None:
os.environ.pop(k, None)
else:
os.environ[k] = old
# Convert input to messages
def _to_messages(data: Union[str, List[Dict[str, Any]]]) -> List[Dict[str, Any]]:
if isinstance(data, str):
return [{"role": "user", "content": data}]
if isinstance(data, list):
return data
return []
messages = _to_messages(input_data)
error = None
with _EnvOverride(env_overrides):
# Use pre-configured agent if available, otherwise create new one
if self.agent_instance:
agent = self.agent_instance
else:
agent = ComputerAgent(model=model, **agent_kwargs) # type: ignore[arg-type]
total_output: List[Any] = []
total_usage: Dict[str, Any] = {}
pending_computer_call_ids = set()
try:
async for result in agent.run(messages):
total_output += result["output"]
# Try to collect usage if present
if (
isinstance(result, dict)
and "usage" in result
and isinstance(result["usage"], dict)
):
# Merge usage counters
for k, v in result["usage"].items():
if isinstance(v, (int, float)):
total_usage[k] = total_usage.get(k, 0) + v
else:
total_usage[k] = v
for msg in result.get("output", []):
if msg.get("type") == "computer_call":
pending_computer_call_ids.add(msg["call_id"])
elif msg.get("type") == "computer_call_output":
pending_computer_call_ids.discard(msg["call_id"])
elif msg.get("type") == "function_call":
pending_computer_call_ids.add(msg["call_id"])
elif msg.get("type") == "function_call_output":
pending_computer_call_ids.discard(msg["call_id"])
# exit if no pending computer calls
if not pending_computer_call_ids:
break
except Exception as e:
logger.error(f"Error running agent: {str(e)}")
logger.error(traceback.format_exc())
error = str(e)
# Build response payload
payload = {
"model": model,
"error": error,
"output": total_output,
"usage": total_usage,
"status": "completed" if not error else "failed",
}
# CORS: allow any origin
headers = {
"Cache-Control": "no-cache",
"Connection": "keep-alive",
}
return JSONResponse(content=payload, headers=headers)
def _find_available_port(self, start_port: int = 8000, max_attempts: int = 100) -> int:
"""Find an available port starting from start_port."""
for port in range(start_port, start_port + max_attempts):
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(("127.0.0.1", port))
return port
except OSError:
continue
raise RuntimeError(
f"Could not find an available port in range {start_port}-{start_port + max_attempts}"
)
async def start_async(self, port: Optional[int] = None, open_browser: bool = False):
"""
Start the playground server asynchronously.
Args:
port: Port to run the server on. If None, finds an available port.
open_browser: Whether to open the browser automatically.
"""
if port is None:
port = self._find_available_port()
self.port = port
host = f"http://localhost:{port}"
logger.info(f"Starting playground server on {host}")
if open_browser:
# Construct the playground URL
encoded_host = quote(host, safe="")
encoded_model = quote(self.agent_instance.model, safe="")
encoded_vnc_url = quote("http://localhost:8006/?autoconnect=true", safe="")
# Build URL with custom_model if agent instance is configured
playground_url = (
# f"http://cua.ai/dashboard/playground"
f"http://localhost:3000/dashboard/playground"
f"?host={encoded_host}"
f"&port={port}"
f"&id=localhost"
f"&name=localhost"
f"&custom_model={encoded_model}"
f"&custom_vnc_url={encoded_vnc_url}"
f"&vnc_password=null"
f"&resize=scale"
f"&fullscreen=true"
)
logger.info(f"Opening browser at: {playground_url}")
webbrowser.open(playground_url)
config = uvicorn.Config(
self.app,
host="0.0.0.0",
port=port,
log_level="info",
)
self.server = uvicorn.Server(config)
await self.server.serve()
def start(self, port: Optional[int] = None, open_browser: bool = False):
"""
Start the playground server (blocking).
Args:
port: Port to run the server on. If None, finds an available port.
open_browser: Whether to open the browser automatically.
"""
# Check if there's already a running event loop
try:
loop = asyncio.get_running_loop()
# If we're in an async context, schedule as a task
import threading
# Run the server in a separate thread to avoid blocking
server_thread = threading.Thread(
target=self._run_in_new_loop,
args=(port, open_browser),
daemon=True,
)
server_thread.start()
# Give the server a moment to start and open browser
import time
time.sleep(1)
except RuntimeError:
# No running loop, can use asyncio.run() safely
asyncio.run(self.start_async(port=port, open_browser=open_browser))
def _run_in_new_loop(self, port: Optional[int] = None, open_browser: bool = False):
"""Helper to run server in a new event loop (for threading)."""
new_loop = asyncio.new_event_loop()
asyncio.set_event_loop(new_loop)
try:
new_loop.run_until_complete(self.start_async(port=port, open_browser=open_browser))
finally:
new_loop.close()
async def stop(self):
"""Stop the playground server."""
if self.server:
logger.info("Stopping playground server")
await self.server.shutdown()

View File

@@ -445,6 +445,7 @@ def convert_responses_items_to_completion_messages(
messages: List[Dict[str, Any]],
allow_images_in_tool_results: bool = True,
send_multiple_user_images_per_parallel_tool_results: bool = False,
use_xml_tools: bool = False,
) -> List[Dict[str, Any]]:
"""Convert responses_items message format to liteLLM completion format.
@@ -453,7 +454,14 @@ def convert_responses_items_to_completion_messages(
allow_images_in_tool_results: If True, include images in tool role messages.
If False, send tool message + separate user message with image.
send_multiple_user_images_per_parallel_tool_results: If True, send multiple user images in parallel tool results.
use_xml_tools: If True, use XML-style <tool_call> tags instead of tool_calls array.
Also sends tool results as user messages instead of tool role.
"""
# Assert that allow_images_in_tool_results is False when use_xml_tools is True
if use_xml_tools:
assert (
not allow_images_in_tool_results
), "allow_images_in_tool_results must be False when use_xml_tools is True"
completion_messages = []
for i, message in enumerate(messages):
@@ -510,101 +518,168 @@ def convert_responses_items_to_completion_messages(
# Handle function calls
elif msg_type == "function_call":
# Add tool call to last assistant message or create new one
if not completion_messages or completion_messages[-1]["role"] != "assistant":
completion_messages.append({"role": "assistant", "content": "", "tool_calls": []})
if use_xml_tools:
# Use XML format instead of tool_calls array
if not completion_messages or completion_messages[-1]["role"] != "assistant":
completion_messages.append({"role": "assistant", "content": ""})
if "tool_calls" not in completion_messages[-1]:
completion_messages[-1]["tool_calls"] = []
# Ensure arguments is a JSON string (not a dict)
arguments = message.get("arguments")
if isinstance(arguments, dict):
arguments = json.dumps(arguments)
completion_messages[-1]["tool_calls"].append(
{
"id": message.get("call_id"),
"type": "function",
"function": {
"name": message.get("name"),
"arguments": message.get("arguments"),
},
}
)
# Format as XML tool call
tool_call_xml = f'<tool_call>{{"name": "{message.get("name")}", "arguments": {arguments}}}</tool_call>'
if completion_messages[-1]["content"]:
completion_messages[-1]["content"] += "\n" + tool_call_xml
else:
completion_messages[-1]["content"] = tool_call_xml
else:
# Add tool call to last assistant message or create new one
if not completion_messages or completion_messages[-1]["role"] != "assistant":
completion_messages.append(
{"role": "assistant", "content": "", "tool_calls": []}
)
if "tool_calls" not in completion_messages[-1]:
completion_messages[-1]["tool_calls"] = []
# Ensure arguments is a JSON string (not a dict)
arguments = message.get("arguments")
if isinstance(arguments, dict):
arguments = json.dumps(arguments)
completion_messages[-1]["tool_calls"].append(
{
"id": message.get("call_id"),
"type": "function",
"function": {
"name": message.get("name"),
"arguments": arguments,
},
}
)
# Handle computer calls
elif msg_type == "computer_call":
# Add tool call to last assistant message or create new one
if not completion_messages or completion_messages[-1]["role"] != "assistant":
completion_messages.append({"role": "assistant", "content": "", "tool_calls": []})
if use_xml_tools:
# Use XML format instead of tool_calls array
if not completion_messages or completion_messages[-1]["role"] != "assistant":
completion_messages.append({"role": "assistant", "content": ""})
if "tool_calls" not in completion_messages[-1]:
completion_messages[-1]["tool_calls"] = []
action = message.get("action", {})
# Format as XML tool call
tool_call_xml = f'<tool_call>{{"name": "computer", "arguments": {json.dumps(action)}}}</tool_call>'
if completion_messages[-1]["content"]:
completion_messages[-1]["content"] += "\n" + tool_call_xml
else:
completion_messages[-1]["content"] = tool_call_xml
else:
# Add tool call to last assistant message or create new one
if not completion_messages or completion_messages[-1]["role"] != "assistant":
completion_messages.append(
{"role": "assistant", "content": "", "tool_calls": []}
)
action = message.get("action", {})
completion_messages[-1]["tool_calls"].append(
{
"id": message.get("call_id"),
"type": "function",
"function": {"name": "computer", "arguments": json.dumps(action)},
}
)
if "tool_calls" not in completion_messages[-1]:
completion_messages[-1]["tool_calls"] = []
action = message.get("action", {})
completion_messages[-1]["tool_calls"].append(
{
"id": message.get("call_id"),
"type": "function",
"function": {"name": "computer", "arguments": json.dumps(action)},
}
)
# Handle function/computer call outputs
elif msg_type in ["function_call_output", "computer_call_output"]:
output = message.get("output")
call_id = message.get("call_id")
if isinstance(output, dict) and output.get("type") == "input_image":
if allow_images_in_tool_results:
# Handle image output as tool response (may not work with all APIs)
if use_xml_tools:
# When using XML tools, send all results as user messages
if isinstance(output, dict) and output.get("type") == "input_image":
# Send image as user message
completion_messages.append(
{
"role": "tool",
"tool_call_id": call_id,
"role": "user",
"content": [
{"type": "image_url", "image_url": {"url": output.get("image_url")}}
{
"type": "image_url",
"image_url": {"url": output.get("image_url")},
}
],
}
)
else:
# Determine if the next message is also a tool call output
next_type = None
if i + 1 < len(messages):
next_msg = messages[i + 1]
next_type = next_msg.get("type")
is_next_message_image_result = next_type in [
"computer_call_output",
]
# Send tool message + separate user message with image (OpenAI compatible)
completion_messages += (
[
# Send text result as user message
completion_messages.append(
{
"role": "user",
"content": str(output),
}
)
else:
# Standard tool message handling
if isinstance(output, dict) and output.get("type") == "input_image":
if allow_images_in_tool_results:
# Handle image output as tool response (may not work with all APIs)
completion_messages.append(
{
"role": "tool",
"tool_call_id": call_id,
"content": "[Execution completed. See screenshot below]",
},
{
"role": "user",
"content": [
{
"type": "image_url",
"image_url": {"url": output.get("image_url")},
}
],
},
]
if send_multiple_user_images_per_parallel_tool_results
or (not is_next_message_image_result)
else [
{
"role": "tool",
"tool_call_id": call_id,
"content": "[Execution completed. See screenshot below]",
},
}
)
else:
# Determine if the next message is also a tool call output
next_type = None
if i + 1 < len(messages):
next_msg = messages[i + 1]
next_type = next_msg.get("type")
is_next_message_image_result = next_type in [
"computer_call_output",
]
# Send tool message + separate user message with image (OpenAI compatible)
completion_messages += (
[
{
"role": "tool",
"tool_call_id": call_id,
"content": "[Execution completed. See screenshot below]",
},
{
"role": "user",
"content": [
{
"type": "image_url",
"image_url": {"url": output.get("image_url")},
}
],
},
]
if send_multiple_user_images_per_parallel_tool_results
or (not is_next_message_image_result)
else [
{
"role": "tool",
"tool_call_id": call_id,
"content": "[Execution completed. See screenshot below]",
},
]
)
else:
# Handle text output as tool response
completion_messages.append(
{"role": "tool", "tool_call_id": call_id, "content": str(output)}
)
else:
# Handle text output as tool response
completion_messages.append(
{"role": "tool", "tool_call_id": call_id, "content": str(output)}
)
return completion_messages

View File

@@ -1,5 +1,24 @@
"""Tools for agent interactions."""
"""
Agent tools module.
Provides base classes and registered tools for agent interactions.
"""
from .base import (
TOOL_REGISTRY,
BaseComputerTool,
BaseTool,
get_registered_tools,
get_tool,
register_tool,
)
from .browser_tool import BrowserTool
__all__ = ["BrowserTool"]
__all__ = [
"BaseTool",
"BaseComputerTool",
"register_tool",
"get_registered_tools",
"get_tool",
"TOOL_REGISTRY",
"BrowserTool",
]

View File

@@ -0,0 +1,253 @@
"""
Base tool interface and registration system for agent tools.
Provides a protocol for implementing tools that can be registered and discovered.
"""
import json
from abc import ABC, abstractmethod
from typing import Dict, List, Optional, Union
# Global tool registry
TOOL_REGISTRY: Dict[str, type] = {}
def register_tool(name: str, allow_overwrite: bool = False):
"""
Decorator to register a tool class with a given name.
Args:
name: The name to register the tool under
allow_overwrite: Whether to allow overwriting an existing tool with the same name
Returns:
Decorator function that registers the class
Example:
@register_tool("my_tool")
class MyTool(BaseTool):
...
"""
def decorator(cls):
if name in TOOL_REGISTRY:
if allow_overwrite:
print(f"Warning: Tool `{name}` already exists! Overwriting with class {cls}.")
else:
raise ValueError(
f"Tool `{name}` already exists! Please ensure that the tool name is unique."
)
if hasattr(cls, "name") and cls.name and (cls.name != name):
raise ValueError(
f'{cls.__name__}.name="{cls.name}" conflicts with @register_tool(name="{name}").'
)
cls.name = name
TOOL_REGISTRY[name] = cls
return cls
return decorator
def is_tool_schema(obj: dict) -> bool:
"""
Check if obj is a valid JSON schema describing a tool compatible with OpenAI's tool calling.
Example valid schema:
{
"name": "get_current_weather",
"description": "Get the current weather in a given location",
"parameters": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA"
},
"unit": {
"type": "string",
"enum": ["celsius", "fahrenheit"]
}
},
"required": ["location"]
}
}
"""
try:
# Basic structure validation
assert set(obj.keys()) == {"name", "description", "parameters"}
assert isinstance(obj["name"], str)
assert obj["name"].strip()
assert isinstance(obj["description"], str)
assert isinstance(obj["parameters"], dict)
# Parameters structure validation
assert "type" in obj["parameters"]
assert obj["parameters"]["type"] == "object"
assert "properties" in obj["parameters"]
assert isinstance(obj["parameters"]["properties"], dict)
if "required" in obj["parameters"]:
assert isinstance(obj["parameters"]["required"], list)
assert set(obj["parameters"]["required"]).issubset(
set(obj["parameters"]["properties"].keys())
)
return True
except (AssertionError, KeyError, TypeError):
return False
class BaseTool(ABC):
"""
Base class for all agent tools.
Tools must implement:
- name: str - The tool name (set by @register_tool decorator)
- description: property that returns str - Tool description
- parameters: property that returns dict - JSON schema for tool parameters
- call: method - Execute the tool with given parameters
"""
name: str = ""
def __init__(self, cfg: Optional[dict] = None):
"""
Initialize the tool.
Args:
cfg: Optional configuration dictionary
"""
self.cfg = cfg or {}
if not self.name:
raise ValueError(
f"You must set {self.__class__.__name__}.name, either by "
f"@register_tool(name=...) or explicitly setting "
f"{self.__class__.__name__}.name"
)
# Validate schema if parameters is a dict
if isinstance(self.parameters, dict):
if not is_tool_schema(
{"name": self.name, "description": self.description, "parameters": self.parameters}
):
raise ValueError(
"The parameters, when provided as a dict, must conform to a "
"valid openai-compatible JSON schema."
)
@property
@abstractmethod
def description(self) -> str:
"""Return the tool description."""
raise NotImplementedError
@property
@abstractmethod
def parameters(self) -> dict:
"""Return the JSON schema for tool parameters."""
raise NotImplementedError
@abstractmethod
def call(self, params: Union[str, dict], **kwargs) -> Union[str, list, dict]:
"""
Execute the tool with the given parameters.
Args:
params: The parameters for the tool call (JSON string or dict)
**kwargs: Additional keyword arguments
Returns:
The result of the tool execution
"""
raise NotImplementedError
def _verify_json_format_args(self, params: Union[str, dict]) -> dict:
"""
Verify and parse the parameters as JSON.
Args:
params: Parameters as string or dict
Returns:
Parsed parameters as dict
Raises:
ValueError: If parameters are not valid JSON or don't match schema
"""
if isinstance(params, str):
try:
params_json: dict = json.loads(params)
except json.JSONDecodeError as e:
raise ValueError(f"Parameters must be formatted as valid JSON: {e}")
else:
params_json: dict = params
# Validate against schema if using dict parameters
if isinstance(self.parameters, dict):
try:
# Basic validation of required fields
if "required" in self.parameters:
for field in self.parameters["required"]:
if field not in params_json:
raise ValueError(f'Required parameter "{field}" is missing')
except (KeyError, TypeError) as e:
raise ValueError(f"Invalid parameters: {e}")
return params_json
@property
def function(self) -> dict:
"""
Return the function information for this tool.
Returns:
Dict with tool metadata
"""
return {
"name": self.name,
"description": self.description,
"parameters": self.parameters,
}
def get_registered_tools() -> Dict[str, type]:
"""
Get all registered tools.
Returns:
Dictionary mapping tool names to tool classes
"""
return TOOL_REGISTRY.copy()
def get_tool(name: str) -> Optional[type]:
"""
Get a registered tool by name.
Args:
name: The tool name
Returns:
The tool class, or None if not found
"""
return TOOL_REGISTRY.get(name)
class BaseComputerTool(BaseTool):
"""
Base class for computer tools that can provide screenshots.
Computer tools must implement:
- All BaseTool requirements (name, description, parameters, call)
- screenshot() method that returns screenshot as base64 string
"""
@abstractmethod
async def screenshot(self) -> str:
"""
Take a screenshot of the computer/browser.
Returns:
Screenshot image data as base64-encoded string
"""
raise NotImplementedError

View File

@@ -1,10 +1,14 @@
"""
Browser Tool for agent interactions.
Allows agents to control a browser programmatically via Playwright.
Implements the computer_use action interface for comprehensive browser control.
"""
import asyncio
import logging
from typing import TYPE_CHECKING, Optional
from typing import TYPE_CHECKING, Optional, Union
from .base import BaseComputerTool, register_tool
if TYPE_CHECKING:
from computer.interface import GenericComputerInterface
@@ -12,124 +16,408 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
class BrowserTool:
@register_tool("computer_use")
class BrowserTool(BaseComputerTool):
"""
Browser tool that uses the computer SDK's interface to control a browser.
Implements the Fara/Magentic-One agent interface for browser control.
Implements a comprehensive computer_use action interface for browser control.
"""
def __init__(
self,
interface: "GenericComputerInterface",
):
def __init__(self, interface: "GenericComputerInterface", cfg: Optional[dict] = None):
"""
Initialize the BrowserTool.
Args:
interface: A GenericComputerInterface instance that provides playwright_exec
cfg: Optional configuration dictionary
"""
self.interface = interface
self.logger = logger
self._facts = [] # Store memorized facts
async def _execute_command(self, command: str, params: dict) -> dict:
# Get initial screenshot to determine dimensions
self.viewport_width = None
self.viewport_height = None
self.resized_width = None
self.resized_height = None
# Try to initialize dimensions synchronously
try:
import asyncio
loop = asyncio.get_event_loop()
if loop.is_running():
# If we're in an async context, dimensions will be lazy-loaded
pass
else:
loop.run_until_complete(self._initialize_dimensions())
except Exception:
# Dimensions will be lazy-loaded on first use
pass
super().__init__(cfg)
async def _initialize_dimensions(self):
"""Initialize viewport and resized dimensions from screenshot."""
try:
import base64
import io
from PIL import Image
from qwen_vl_utils import smart_resize
# Take a screenshot to get actual dimensions
screenshot_b64 = await self.screenshot()
img_bytes = base64.b64decode(screenshot_b64)
im = Image.open(io.BytesIO(img_bytes))
# Store actual viewport size
self.viewport_width = im.width
self.viewport_height = im.height
# Calculate resized dimensions using smart_resize with factor=28
MIN_PIXELS = 3136
MAX_PIXELS = 12845056
rh, rw = smart_resize(
im.height, im.width, factor=28, min_pixels=MIN_PIXELS, max_pixels=MAX_PIXELS
)
self.resized_width = rw
self.resized_height = rh
except Exception as e:
# Fall back to defaults if initialization fails
logger.warning(f"Failed to initialize dimensions: {e}")
self.viewport_width = 1024
self.viewport_height = 768
self.resized_width = 1024
self.resized_height = 768
async def _proc_coords(self, x: float, y: float) -> tuple:
"""
Execute a browser command via the computer interface.
Process coordinates by converting from resized space to viewport space.
Args:
command: Command name
params: Command parameters
x: X coordinate in resized space (0 to resized_width)
y: Y coordinate in resized space (0 to resized_height)
Returns:
Response dictionary
Tuple of (viewport_x, viewport_y) in actual viewport pixels
"""
# Ensure dimensions are initialized
if self.resized_width is None or self.resized_height is None:
await self._initialize_dimensions()
# Convert from resized space to viewport space
# Normalize by resized dimensions, then scale to viewport dimensions
viewport_x = (x / self.resized_width) * self.viewport_width
viewport_y = (y / self.resized_height) * self.viewport_height
return int(round(viewport_x)), int(round(viewport_y))
@property
def description(self) -> str:
# Use resized dimensions if available, otherwise use defaults
width = self.resized_width if self.resized_width is not None else 1024
height = self.resized_height if self.resized_height is not None else 768
return f"Use a mouse and keyboard to interact with a computer, and take screenshots.\
* This is an interface to a desktop GUI. You do not have access to a terminal or applications menu. You must click on desktop icons to start applications.\
* Some applications may take time to start or process actions, so you may need to wait and take successive screenshots to see the results of your actions. E.g. if you click on Firefox and a window doesn't open, try wait and taking another screenshot.\
* The screen's resolution is {width}x{height}.\
* Whenever you intend to move the cursor to click on an element like an icon, you should consult a screenshot to determine the coordinates of the element before moving the cursor.\
* If you tried clicking on a program or link but it failed to load, even after waiting, try adjusting your cursor position so that the tip of the cursor visually falls on the element that you want to click.\
* Make sure to click any buttons, links, icons, etc with the cursor tip in the center of the element. Don't click boxes on their edges unless asked.\
* When a separate scrollable container prominently overlays the webpage, if you want to scroll within it, you typically need to mouse_move() over it first and then scroll().\
* If a popup window appears that you want to close, if left_click() on the 'X' or close button doesn't work, try key(keys=['Escape']) to close it.\
* On some search bars, when you type(), you may need to press_enter=False and instead separately call left_click() on the search button to submit the search query. This is especially true of search bars that have auto-suggest popups for e.g. locations\
* For calendar widgets, you usually need to left_click() on arrows to move between months and left_click() on dates to select them; type() is not typically used to input dates there.".strip()
@property
def parameters(self) -> dict:
return {
"type": "object",
"properties": {
"action": {
"description": """The action to perform. The available actions are:
* key: Performs key down presses on the arguments passed in order, then performs key releases in reverse order. Includes 'Enter', 'Alt', 'Shift', 'Tab', 'Control', 'Backspace', 'Delete', 'Escape', 'ArrowUp', 'ArrowDown', 'ArrowLeft', 'ArrowRight', 'PageDown', 'PageUp', 'Shift', etc.
* type: Type a string of text on the keyboard.
* mouse_move: Move the cursor to a specified (x, y) pixel coordinate on the screen.
* left_click: Click the left mouse button.
* scroll: Performs a scroll of the mouse scroll wheel.
* visit_url: Visit a specified URL.
* web_search: Perform a web search with a specified query.
* history_back: Go back to the previous page in the browser history.
* pause_and_memorize_fact: Pause and memorize a fact for future reference.
* wait: Wait specified seconds for the change to happen.
* terminate: Terminate the current task and report its completion status.""",
"enum": [
"key",
"type",
"mouse_move",
"left_click",
"scroll",
"visit_url",
"web_search",
"history_back",
"pause_and_memorize_fact",
"wait",
"terminate",
],
"type": "string",
},
"keys": {"description": "Required only by action=key.", "type": "array"},
"text": {"description": "Required only by action=type.", "type": "string"},
"coordinate": {
"description": "(x, y) coordinates for mouse actions. Required only by action=left_click, action=mouse_move, and action=type.",
"type": "array",
},
"pixels": {
"description": "Amount of scrolling. Positive = up, Negative = down. Required only by action=scroll.",
"type": "number",
},
"url": {
"description": "The URL to visit. Required only by action=visit_url.",
"type": "string",
},
"query": {
"description": "The query to search for. Required only by action=web_search.",
"type": "string",
},
"fact": {
"description": "The fact to remember for the future. Required only by action=pause_and_memorize_fact.",
"type": "string",
},
"time": {
"description": "Seconds to wait. Required only by action=wait.",
"type": "number",
},
"status": {
"description": "Status of the task. Required only by action=terminate.",
"type": "string",
"enum": ["success", "failure"],
},
},
"required": ["action"],
}
def call(self, params: Union[str, dict], **kwargs) -> Union[str, dict]:
"""
Execute a browser action.
Args:
params: Action parameters (JSON string or dict)
**kwargs: Additional keyword arguments
Returns:
Result of the action execution
"""
# Verify and parse parameters
params_dict = self._verify_json_format_args(params)
action = params_dict.get("action")
if not action:
return {"success": False, "error": "action parameter is required"}
# Execute action synchronously by running async method in event loop
try:
result = await self.interface.playwright_exec(command, params)
if not result.get("success"):
self.logger.error(
f"Browser command '{command}' failed: {result.get('error', 'Unknown error')}"
)
loop = asyncio.get_event_loop()
if loop.is_running():
# If we're already in an async context, we can't use run_until_complete
# Create a task and wait for it
import concurrent.futures
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(asyncio.run, self._execute_action(action, params_dict))
result = future.result()
else:
result = loop.run_until_complete(self._execute_action(action, params_dict))
return result
except Exception as e:
self.logger.error(f"Error executing browser command '{command}': {e}")
logger.error(f"Error executing action {action}: {e}")
return {"success": False, "error": str(e)}
async def _execute_action(self, action: str, params: dict) -> dict:
"""Execute the specific action asynchronously."""
try:
if action == "key":
return await self._action_key(params)
elif action == "type":
return await self._action_type(params)
elif action == "mouse_move":
return await self._action_mouse_move(params)
elif action == "left_click":
return await self._action_left_click(params)
elif action == "scroll":
return await self._action_scroll(params)
elif action == "visit_url":
return await self._action_visit_url(params)
elif action == "web_search":
return await self._action_web_search(params)
elif action == "history_back":
return await self._action_history_back(params)
elif action == "pause_and_memorize_fact":
return await self._action_pause_and_memorize_fact(params)
elif action == "wait":
return await self._action_wait(params)
elif action == "terminate":
return await self._action_terminate(params)
else:
return {"success": False, "error": f"Unknown action: {action}"}
except Exception as e:
logger.error(f"Error in action {action}: {e}")
return {"success": False, "error": str(e)}
async def _action_key(self, params: dict) -> dict:
"""Press keys in sequence."""
keys = params.get("keys", [])
if not keys:
return {"success": False, "error": "keys parameter is required"}
# Convert keys to proper format and press via hotkey
try:
await self.interface.hotkey(*keys)
return {"success": True, "message": f"Pressed keys: {keys}"}
except Exception as e:
return {"success": False, "error": str(e)}
async def _action_type(self, params: dict) -> dict:
"""Type text."""
text = params.get("text")
if not text:
return {"success": False, "error": "text parameter is required"}
# If coordinate is provided, click there first
coordinate = params.get("coordinate")
if coordinate and len(coordinate) == 2:
await self.interface.playwright_exec("click", {"x": coordinate[0], "y": coordinate[1]})
result = await self.interface.playwright_exec("type", {"text": text})
return result
async def _action_mouse_move(self, params: dict) -> dict:
"""Move mouse to coordinates."""
coordinate = params.get("coordinate")
if not coordinate or len(coordinate) != 2:
return {"success": False, "error": "coordinate parameter [x, y] is required"}
await self.interface.move_cursor(coordinate[0], coordinate[1])
return {"success": True, "message": f"Moved cursor to {coordinate}"}
async def _action_left_click(self, params: dict) -> dict:
"""Click at coordinates."""
coordinate = params.get("coordinate")
if not coordinate or len(coordinate) != 2:
return {"success": False, "error": "coordinate parameter [x, y] is required"}
result = await self.interface.playwright_exec(
"click", {"x": coordinate[0], "y": coordinate[1]}
)
return result
async def _action_scroll(self, params: dict) -> dict:
"""Scroll the page."""
pixels = params.get("pixels", 0)
if pixels == 0:
return {"success": False, "error": "pixels parameter is required"}
# Positive = up (negative delta_y), Negative = down (positive delta_y)
result = await self.interface.playwright_exec("scroll", {"delta_x": 0, "delta_y": -pixels})
return result
async def _action_visit_url(self, params: dict) -> dict:
"""Visit a URL."""
url = params.get("url")
if not url:
return {"success": False, "error": "url parameter is required"}
result = await self.interface.playwright_exec("visit_url", {"url": url})
return result
async def _action_web_search(self, params: dict) -> dict:
"""Perform web search."""
query = params.get("query")
if not query:
return {"success": False, "error": "query parameter is required"}
result = await self.interface.playwright_exec("web_search", {"query": query})
return result
async def _action_history_back(self, params: dict) -> dict:
"""Go back in browser history."""
# Press Alt+Left arrow key combination
try:
await self.interface.hotkey("Alt", "ArrowLeft")
return {"success": True, "message": "Navigated back in history"}
except Exception as e:
return {"success": False, "error": str(e)}
async def _action_pause_and_memorize_fact(self, params: dict) -> dict:
"""Memorize a fact."""
fact = params.get("fact")
if not fact:
return {"success": False, "error": "fact parameter is required"}
self._facts.append(fact)
return {
"success": True,
"message": f"Memorized fact: {fact}",
"total_facts": len(self._facts),
}
async def _action_wait(self, params: dict) -> dict:
"""Wait for specified seconds."""
time = params.get("time", 0)
if time <= 0:
return {"success": False, "error": "time parameter must be positive"}
await asyncio.sleep(time)
return {"success": True, "message": f"Waited {time} seconds"}
async def _action_terminate(self, params: dict) -> dict:
"""Terminate and report status."""
status = params.get("status", "success")
message = f"Task terminated with status: {status}"
if self._facts:
message += f"\nMemorized facts: {self._facts}"
return {"success": True, "status": status, "message": message, "terminated": True}
# Legacy methods for backward compatibility
async def visit_url(self, url: str) -> dict:
"""
Navigate to a URL.
Args:
url: URL to visit
Returns:
Response dictionary with success status and current URL
"""
return await self._execute_command("visit_url", {"url": url})
"""Navigate to a URL."""
return await self._action_visit_url({"url": url})
async def click(self, x: int, y: int) -> dict:
"""
Click at coordinates.
Args:
x: X coordinate
y: Y coordinate
Returns:
Response dictionary with success status
"""
return await self._execute_command("click", {"x": x, "y": y})
"""Click at coordinates."""
return await self._action_left_click({"coordinate": [x, y]})
async def type(self, text: str) -> dict:
"""
Type text into the focused element.
Args:
text: Text to type
Returns:
Response dictionary with success status
"""
return await self._execute_command("type", {"text": text})
"""Type text into the focused element."""
return await self._action_type({"text": text})
async def scroll(self, delta_x: int, delta_y: int) -> dict:
"""
Scroll the page.
Args:
delta_x: Horizontal scroll delta
delta_y: Vertical scroll delta
Returns:
Response dictionary with success status
"""
return await self._execute_command("scroll", {"delta_x": delta_x, "delta_y": delta_y})
"""Scroll the page."""
return await self._action_scroll({"pixels": -delta_y})
async def web_search(self, query: str) -> dict:
"""
Navigate to a Google search for the query.
"""Navigate to a Google search for the query."""
return await self._action_web_search({"query": query})
Args:
query: Search query
Returns:
Response dictionary with success status and current URL
"""
return await self._execute_command("web_search", {"query": query})
async def screenshot(self) -> bytes:
"""
Take a screenshot of the current browser page.
Returns:
Screenshot image data as bytes (PNG format)
"""
import base64
result = await self._execute_command("screenshot", {})
async def screenshot(self) -> str:
"""Take a screenshot of the current browser page."""
result = await self.interface.playwright_exec("screenshot", {})
if result.get("success") and result.get("screenshot"):
# Decode base64 screenshot to bytes
screenshot_b64 = result["screenshot"]
screenshot_bytes = base64.b64decode(screenshot_b64)
return screenshot_bytes
return screenshot_b64
else:
error = result.get("error", "Unknown error")
raise RuntimeError(f"Failed to take screenshot: {error}")
async def get_current_url(self) -> str:
"""Get the current URL of the browser page."""
result = await self.interface.playwright_exec("get_current_url", {})
if result.get("success") and result.get("url"):
return result["url"]
else:
error = result.get("error", "Unknown error")
raise RuntimeError(f"Failed to get current URL: {error}")

View File

@@ -202,6 +202,11 @@ class BrowserManager:
screenshot_b64 = base64.b64encode(screenshot_bytes).decode("utf-8")
return {"success": True, "screenshot": screenshot_b64}
elif cmd == "get_current_url":
# Get the current URL
current_url = self.page.url
return {"success": True, "url": current_url}
else:
return {"success": False, "error": f"Unknown command: {cmd}"}