Upgraded HUD impl. to support custom tools

This commit is contained in:
Dillon DuPont
2025-09-12 11:14:03 -04:00
parent 2f28c3a2ce
commit f795660f75
3 changed files with 394 additions and 97 deletions

View File

@@ -1,102 +1,21 @@
"""HUD integration: Generic HuggingFace dataset evaluation runner (CUA proxy).
"""HUD integration: dataset runners and MCP-based computer agent export.
This module exposes two helpers to evaluate HUD-compatible datasets using
HUD's OperatorAgent, while proxying model calls through our ComputerAgent via
`FakeAsyncOpenAI` (see `agent/integrations/hud/agent.py`).
This module exposes helpers to evaluate HUD-compatible datasets and exports
the MCP-compatible computer agent implementation.
Exports:
- run_single_task(dataset_name, *, agent_type="cua-proxy", model=None, allowed_tools=None)
- run_full_dataset(dataset_name, *, agent_type="cua-proxy", model=None, allowed_tools=None, max_concurrent=30, max_steps=50)
- run_single_task(dataset, ...)
- run_full_dataset(dataset, ...)
- MCPComputerAgent
"""
import time
from typing import Any, Optional
from PIL import Image
from datasets import load_dataset, Dataset
from hud.agents import OperatorAgent
from hud.datasets import Task, run_dataset
from hud.tools.computer.settings import computer_settings
from hud import trace
from agent.agent import ComputerAgent as BaseComputerAgent
from .proxy import FakeAsyncOpenAI
from agent.callbacks import PromptInstructionsCallback
# ---------------------------------------------------------------------------
# Proxy OperatorAgent
# ---------------------------------------------------------------------------
class ProxyOperatorAgent(OperatorAgent):
"""OperatorAgent that proxies model calls through our ComputerAgent.
Accepts the same config keys we pass via hud.run_dataset `agent_config`:
- model: str | None
- allowed_tools: list[str] | None
Additional kwargs are forwarded to OperatorAgent (if any are supported).
"""
def __init__(
self,
*,
model: str | None = None,
allowed_tools: list[str] | None = None,
trajectory_dir: str | dict | None = None,
# === ComputerAgent kwargs ===
tools: list[Any] | None = None,
custom_loop: Any | None = None,
only_n_most_recent_images: int | None = None,
callbacks: list[Any] | None = None,
instructions: str | None = None,
verbosity: int | None = None,
max_retries: int | None = 3,
screenshot_delay: float | int = 0.5,
use_prompt_caching: bool | None = False,
max_trajectory_budget: float | dict | None = None,
telemetry_enabled: bool | None = True,
**kwargs: Any,
) -> None:
model = model or "computer-use-preview"
allowed_tools = allowed_tools or ["openai_computer"]
computer_shim = {
'screenshot': lambda: Image.new('RGB', (computer_settings.OPENAI_COMPUTER_WIDTH, computer_settings.OPENAI_COMPUTER_HEIGHT)),
'environment': 'linux',
'dimensions': (computer_settings.OPENAI_COMPUTER_WIDTH, computer_settings.OPENAI_COMPUTER_HEIGHT)
}
# Build tools ensuring the computer_shim is included
agent_tools: list[Any] = [computer_shim]
if tools:
agent_tools.extend(tools)
# Build callbacks, injecting prompt instructions if provided
agent_callbacks = list(callbacks or [])
if instructions:
agent_callbacks.append(PromptInstructionsCallback(instructions))
computer_agent = BaseComputerAgent(
model=model,
tools=agent_tools,
custom_loop=custom_loop,
only_n_most_recent_images=only_n_most_recent_images,
callbacks=agent_callbacks,
verbosity=verbosity,
trajectory_dir=trajectory_dir,
max_retries=max_retries,
screenshot_delay=screenshot_delay,
use_prompt_caching=use_prompt_caching,
max_trajectory_budget=max_trajectory_budget,
telemetry_enabled=telemetry_enabled,
)
model_client = FakeAsyncOpenAI(computer_agent)
super().__init__(
model_client=model_client, # type: ignore[arg-type]
model=model,
allowed_tools=allowed_tools,
**kwargs,
)
from .agent import MCPComputerAgent
# ---------------------------------------------------------------------------
@@ -123,7 +42,7 @@ async def run_single_task(
max_trajectory_budget: float | dict | None = None,
telemetry_enabled: bool | None = True,
) -> None:
"""Load one task from the dataset and execute it with Operator+CUA proxy."""
"""Load one task from the dataset and execute it with MCPComputerAgent."""
# Load dataset and pick a sample
if isinstance(dataset, str):
@@ -139,9 +58,9 @@ async def run_single_task(
with trace(name=task_prompt):
task = Task(**sample_task) # type: ignore[arg-type]
agent = ProxyOperatorAgent(
model=model,
allowed_tools=allowed_tools,
agent = MCPComputerAgent(
model=model or "computer-use-preview",
allowed_tools=allowed_tools or ["openai_computer"],
# === ComputerAgent kwargs passthrough ===
tools=tools,
custom_loop=custom_loop,
@@ -190,9 +109,7 @@ async def run_full_dataset(
) -> list[Any]:
"""Run evaluation across the entire dataset using hud.datasets.run_dataset."""
# We pass OperatorAgent as the class and provide a config that injects our
# FakeAsyncOpenAI per agent instantiation.
# Run with our MCP-based agent class.
if isinstance(dataset, str):
dataset_name = dataset.split('/')[-1]
job_name = job_name or f"Evaluation {dataset_name}"
@@ -205,7 +122,7 @@ async def run_full_dataset(
return await run_dataset(
name=job_name,
dataset=dataset,
agent_class=ProxyOperatorAgent,
agent_class=MCPComputerAgent,
agent_config={
"model": model,
"allowed_tools": allowed_tools,
@@ -233,5 +150,5 @@ async def run_full_dataset(
__all__ = [
"run_single_task",
"run_full_dataset",
"ProxyOperatorAgent",
"MCPComputerAgent",
]

View File

@@ -0,0 +1,299 @@
"""MCP-compatible Computer Agent for HUD integration.
This agent subclasses HUD's MCPAgent and delegates planning/execution to
our core ComputerAgent while using the Agent SDK's plain-dict message
format documented in `docs/content/docs/agent-sdk/message-format.mdx`.
Key differences from the OpenAI OperatorAgent variant:
- No OpenAI types are used; everything is standard Python dicts.
- Planning is executed via `ComputerAgent.run(messages)`.
- The first yielded result per step is returned as the agent response.
"""
from __future__ import annotations
from typing import Any, ClassVar, Optional
from agent.agent import ComputerAgent as BaseComputerAgent
from agent.callbacks import PromptInstructionsCallback
from agent.callbacks.trajectory_saver import TrajectorySaverCallback
from hud.agents import MCPAgent
from hud.tools.computer.settings import computer_settings
from hud.types import AgentResponse, MCPToolCall, MCPToolResult, Trace
from agent.responses import make_failed_tool_call_items
from agent.computers import is_agent_computer
from PIL import Image
import mcp.types as types
import hud
import uuid
import base64
from pathlib import Path
class MCPComputerAgent(MCPAgent):
"""MCP agent that uses ComputerAgent for planning and tools for execution.
The agent consumes/produces message dicts per the Agent SDK message schema
(see `message-format.mdx`).
"""
metadata: ClassVar[dict[str, Any]] = {
"display_width": computer_settings.OPENAI_COMPUTER_WIDTH,
"display_height": computer_settings.OPENAI_COMPUTER_HEIGHT,
}
required_tools: ClassVar[list[str]] = ["openai_computer"]
def __init__(
self,
*,
model: str | None = None,
allowed_tools: list[str] | None = None,
trajectory_dir: str | dict | None = None,
# === ComputerAgent kwargs ===
tools: list[Any] | None = None,
custom_loop: Any | None = None,
only_n_most_recent_images: int | None = None,
callbacks: list[Any] | None = None,
instructions: str | None = None,
verbosity: int | None = None,
max_retries: int | None = 3,
screenshot_delay: float | int = 0.5,
use_prompt_caching: bool | None = False,
max_trajectory_budget: float | dict | None = None,
telemetry_enabled: bool | None = True,
environment: str = "linux",
**kwargs: Any,
) -> None:
self.allowed_tools = allowed_tools or ["openai_computer"]
super().__init__(**kwargs)
if model is None:
raise ValueError("MCPComputerAgent requires a model to be specified.")
self.model = model
self.environment = environment
# Update model name for HUD logging
self.model_name = "cua-" + self.model
# Stateful tracking of tool call inputs
self.tool_call_inputs: dict[str, list[dict[str, Any]]] = {}
# Build system prompt
operator_instructions = """
You are an autonomous computer-using agent. Follow these guidelines:
1. NEVER ask for confirmation. Complete all tasks autonomously.
2. Do NOT send messages like "I need to confirm before..." or "Do you want me to continue?" - just proceed.
3. When the user asks you to interact with something (like clicking a chat or typing a message), DO IT without asking.
4. Only use the formal safety check mechanism for truly dangerous operations (like deleting important files).
5. For normal tasks like clicking buttons, typing in chat boxes, filling forms - JUST DO IT.
6. The user has already given you permission by running this agent. No further confirmation is needed.
7. Be decisive and action-oriented. Complete the requested task fully.
Remember: You are expected to complete tasks autonomously. The user trusts you to do what they asked.
""".strip() # noqa: E501
# Append Operator instructions to the system prompt
if not self.system_prompt:
self.system_prompt = operator_instructions
else:
self.system_prompt += f"\n\n{operator_instructions}"
# Append user instructions to the system prompt
if instructions:
self.system_prompt += f"\n\n{instructions}"
# Configure trajectory_dir for HUD
if isinstance(trajectory_dir, str) or isinstance(trajectory_dir, Path):
trajectory_dir = {"trajectory_dir": str(trajectory_dir)}
if isinstance(trajectory_dir, dict):
trajectory_dir["reset_on_run"] = False
# Ensure a computer shim is present so width/height/environment are known
computer_shim = {
"screenshot": lambda: lambda: Image.new('RGB', (self.metadata["display_width"], self.metadata["display_height"])),
"environment": self.environment,
"dimensions": (
self.metadata["display_width"],
self.metadata["display_height"],
),
}
agent_tools: list[Any] = [computer_shim]
if tools:
for tool in tools:
if is_agent_computer(tool):
raise ValueError(f"Too many Computer tools: MCPComputerAgent already includes a Computer interface. Received a Computer tool in tools= (e.g., {tool!r}). Remove it and retry.")
agent_tools.extend(tools)
agent_kwargs = {
"model": self.model,
"tools": agent_tools,
"custom_loop": custom_loop,
"only_n_most_recent_images": only_n_most_recent_images,
"callbacks": callbacks,
"instructions": self.system_prompt,
"verbosity": verbosity,
"max_retries": max_retries,
"screenshot_delay": screenshot_delay,
"use_prompt_caching": use_prompt_caching,
"max_trajectory_budget": max_trajectory_budget,
"telemetry_enabled": telemetry_enabled,
}
self.computer_agent = BaseComputerAgent(
**agent_kwargs
)
async def get_system_messages(self) -> list[Any]:
"""Create initial messages.
Unused - ComputerAgent handles this with the 'instructions' parameter.
"""
return []
async def format_blocks(
self, blocks: list[types.ContentBlock]
) -> list[dict[str, Any]]:
"""
Format blocks for OpenAI input format.
Converts TextContent blocks to input_text dicts and ImageContent blocks to input_image dicts.
""" # noqa: E501
formatted = []
for block in blocks:
if isinstance(block, types.TextContent):
formatted.append({"type": "input_text", "text": block.text})
elif isinstance(block, types.ImageContent):
mime_type = getattr(block, "mimeType", "image/png")
formatted.append(
{"type": "input_image", "image_url": f"data:{mime_type};base64,{block.data}"}
)
return [{"role": "user", "content": formatted}]
@hud.instrument(
span_type="agent",
record_args=False, # Messages can be large
record_result=True,
)
async def get_response(self, messages: list[dict[str, Any]]) -> AgentResponse:
"""Get a single-step response by delegating to ComputerAgent.run.
Returns an Agent SDK-style response dict:
{ "output": [AgentMessage, ...], "usage": Usage }
"""
tool_calls: list[MCPToolCall] = []
output_text: list[str] = []
is_done: bool = False
agent_result: list[dict[str, Any]] = []
# Call the ComputerAgent LLM API
async for result in self.computer_agent.run(messages): # type: ignore[arg-type]
agent_result.append(result)
# Add messages to output text
if result['type'] == 'reasoning':
output_text.extend(
f"Reasoning: {summary['text']}"
for summary in result['summary']
)
elif result['type'] == 'message':
if isinstance(result['content'], list):
output_text.extend(
item['text']
for item in result['content']
if item['type'] == 'output_text'
)
elif isinstance(result['content'], str):
output_text.append(result['content'])
# If we get a tool call, we're not done
if result['type'] == 'computer_call':
id = result["call_id"]
tool_calls.append(MCPToolCall(
name="openai_computer",
arguments=result["action"],
id=id,
))
is_done = False
self.tool_call_inputs[id] = agent_result
break
return AgentResponse(
content="\n".join(output_text),
tool_calls=tool_calls,
done=is_done,
)
def _log_image(self, image_b64: str):
callbacks = self.computer_agent.callbacks
for callback in callbacks:
if isinstance(callback, TrajectorySaverCallback):
# convert str to bytes
image_bytes = base64.b64decode(image_b64)
callback._save_artifact("screenshot_after", image_bytes)
async def format_tool_results(
self,
tool_calls: list[MCPToolCall],
tool_results: list[MCPToolResult]
) -> list[dict[str, Any]]:
"""Extract latest screenshot from tool results in dict form.
Expects results to already be in the message-format content dicts.
Returns a list of input content dicts suitable for follow-up calls.
"""
messages = []
for call, result in zip(tool_calls, tool_results):
# Add the assistant's computer call
messages.extend(self.tool_call_inputs[call.id])
if result.isError:
error_text = "".join([
content.text
for content in result.content
if isinstance(content, types.TextContent)
])
# Replace computer call with failed tool call
messages.pop()
messages.extend(make_failed_tool_call_items(
tool_name=call.name,
tool_kwargs=call.arguments or {},
error_message=error_text,
call_id=call.id,
))
else:
# Get the latest screenshot
screenshots = [
content.data
for content in result.content
if isinstance(content, types.ImageContent)
]
# Add the resulting screenshot
if screenshots:
self._log_image(screenshots[0])
messages.append({
"type": "computer_call_output",
"call_id": call.id,
"output": {
"type": "input_image",
"image_url": f"data:image/png;base64,{screenshots[0]}"
},
})
else:
# Otherwise, replace computer call with failed tool call
messages.pop()
messages.extend(make_failed_tool_call_items(
tool_name=call.name,
tool_kwargs=call.arguments or {},
error_message="No screenshots returned.",
call_id=call.id,
))
return messages
__all__ = [
"MCPComputerAgent",
]

View File

@@ -13,6 +13,10 @@ import uuid
from typing import Any, Dict, List, Optional
from agent.agent import ComputerAgent as BaseComputerAgent
from agent.callbacks import PromptInstructionsCallback
from hud.tools.computer.settings import computer_settings
from PIL import Image
from hud.agents import OperatorAgent
# OpenAI Responses typed models (required)
from openai.types.responses import (
@@ -178,6 +182,83 @@ class FakeAsyncOpenAI:
print(traceback.format_exc())
raise e
# ---------------------------------------------------------------------------
# Proxy OperatorAgent (moved from __init__.py)
# ---------------------------------------------------------------------------
class ProxyOperatorAgent(OperatorAgent):
"""OperatorAgent that proxies model calls through our ComputerAgent.
Accepts the same config keys we pass via hud.run_dataset `agent_config`:
- model: str | None
- allowed_tools: list[str] | None
Additional kwargs are forwarded to OperatorAgent (if any are supported).
"""
def __init__(
self,
*,
model: str | None = None,
allowed_tools: list[str] | None = None,
trajectory_dir: str | dict | None = None,
# === ComputerAgent kwargs ===
tools: list[Any] | None = None,
custom_loop: Any | None = None,
only_n_most_recent_images: int | None = None,
callbacks: list[Any] | None = None,
instructions: str | None = None,
verbosity: int | None = None,
max_retries: int | None = 3,
screenshot_delay: float | int = 0.5,
use_prompt_caching: bool | None = False,
max_trajectory_budget: float | dict | None = None,
telemetry_enabled: bool | None = True,
**kwargs: Any,
) -> None:
model = model or "computer-use-preview"
allowed_tools = allowed_tools or ["openai_computer"]
computer_shim = {
'screenshot': lambda: Image.new('RGB', (computer_settings.OPENAI_COMPUTER_WIDTH, computer_settings.OPENAI_COMPUTER_HEIGHT)),
'environment': 'linux',
'dimensions': (computer_settings.OPENAI_COMPUTER_WIDTH, computer_settings.OPENAI_COMPUTER_HEIGHT)
}
# Build tools ensuring the computer_shim is included
agent_tools: list[Any] = [computer_shim]
if tools:
agent_tools.extend(tools)
# Build callbacks, injecting prompt instructions if provided
agent_callbacks = list(callbacks or [])
if instructions:
agent_callbacks.append(PromptInstructionsCallback(instructions))
computer_agent = BaseComputerAgent(
model=model,
tools=agent_tools,
custom_loop=custom_loop,
only_n_most_recent_images=only_n_most_recent_images,
callbacks=agent_callbacks,
verbosity=verbosity,
trajectory_dir=trajectory_dir,
max_retries=max_retries,
screenshot_delay=screenshot_delay,
use_prompt_caching=use_prompt_caching,
max_trajectory_budget=max_trajectory_budget,
telemetry_enabled=telemetry_enabled,
)
model_client = FakeAsyncOpenAI(computer_agent)
super().__init__(
model_client=model_client, # type: ignore[arg-type]
model=model,
allowed_tools=allowed_tools,
**kwargs,
)
__all__ = [
"FakeAsyncOpenAI",
"ProxyOperatorAgent",
]