Merge pull request #334 from trycua/integrations/hud

[Agent] Add HUD evals, OSWorld-verified docs, and support for custom computers
This commit is contained in:
ddupont
2025-08-13 08:33:01 -04:00
committed by GitHub
30 changed files with 112891 additions and 151 deletions

View File

@@ -28,12 +28,16 @@ With the Agent SDK, you can:
- run composed agents using UI grounding models and any LLM
- use any liteLLM provider (`openai/`, `openrouter/`, etc.) or our included local providers (`huggingface-local/`, `mlx/`)
- quickly evaluate new UI agent models and UI grounding models
- `anthropic/claude-opus-4-1-20250805`
- `anthropic/claude-opus-4-1-20250805` (using [Computer-Use Models](https://docs.trycua.com/docs/agent-sdk/supported-agents/computer-use-agents))
- `openai/computer-use-preview`
- `openrouter/z-ai/glm-4.5v`
- `huggingface-local/ByteDance-Seed/UI-TARS-1.5-7B`
- `omniparser+any LLM`
- `huggingface-local/HelloKKMe/GTA1-7B+any LLM` (using [Composed Agents](https://docs.trycua.com/docs/agent-sdk/supported-agents/composed-agents))
- `omniparser+{any LLM}` (using [Composed Agents](https://docs.trycua.com/docs/agent-sdk/supported-agents/composed-agents))
- `huggingface-local/HelloKKMe/GTA1-7B+{any LLM}`
- `huggingface/HelloKKMe/GTA1-32B+{any LLM}`
- `vllm_hosted/HelloKKMe/GTA1-72B+{any LLM}`
- `human/human` (using [Human-in-the-Loop](https://docs.trycua.com/docs/agent-sdk/supported-agents/human-in-the-loop))
- benchmark on OSWorld-Verified, SheetBench-V2, and more [with a single line of code using HUD](https://docs.trycua.com/docs/agent-sdk/integrations/hud) ([Notebook](https://github.com/trycua/cua/blob/main/notebooks/eval_osworld.ipynb))
Missing a model? [Raise a feature request](https://github.com/trycua/cua/issues/new?assignees=&labels=enhancement&projects=&title=%5BAgent%5D%3A+Add+model+support+for+) or [contribute](https://github.com/trycua/cua/blob/main/CONTRIBUTING.md)!

View File

@@ -3,6 +3,7 @@
"introduction",
"screenspot-v2",
"screenspot-pro",
"interactive"
"interactive",
"osworld-verified"
]
}

View File

@@ -0,0 +1,89 @@
---
title: OSWorld-Verified
description: Benchmark ComputerAgent on OSWorld tasks using HUD
---
OSWorld-Verified is a curated subset of OSWorld tasks that can be run using the HUD framework. Use ComputerAgent with HUD to benchmark on these tasks.
## Setup
```bash
pip install hud-python==0.2.10
```
Set environment variables:
```bash
export HUD_API_KEY="your_hud_key"
export ANTHROPIC_API_KEY="your_anthropic_key" # For Claude
export OPENAI_API_KEY="your_openai_key" # For OpenAI
```
## Quick Start
```python
import asyncio
from hud import gym, load_taskset
from agent.integrations.hud import ComputerAgent
async def run_osworld():
# Load taskset
taskset = await load_taskset("OSWorld-Verified")
test = taskset[144] # Example task
# Create environment (~2.5 min startup)
env = await gym.make(test)
# Create agent
agent = ComputerAgent(
model="anthropic/claude-3-5-sonnet-20241022", # any ComputerAgent model string
environment="linux"
)
# Run benchmark
obs, _ = await env.reset()
for i in range(100):
action, done = await agent.predict(obs)
obs, reward, terminated, info = await env.step(action)
if done or terminated:
break
# Evaluate results
result = await env.evaluate()
await env.close()
return result
# Run benchmark
result = asyncio.run(run_osworld())
print(f"Success: {result.get('success', False)}")
```
## Parallel Execution
Run all tasks in parallel using `run_job`:
```python
from agent.integrations.hud import run_job
from hud import load_taskset
from hud.taskset import TaskSet
import logging
# Load taskset
taskset = await load_taskset("OSWorld-Verified")
taskset = TaskSet(tasks=taskset[:10]) # limit to 10 tasks instead of all 370
# Run benchmark job
job = await run_job(
model="openai/computer-use-preview",
task_or_taskset=taskset,
job_name="test-computeragent-job",
max_concurrent_tasks=5,
# add any extra ComputerAgent kwargs:
verbosity=logging.INFO, # Enable logging
# trajectory_dir=".." # Save trajectories locally
)
# Get results OR view them at app.hud.so
print(await job.get_analytics())
print(f"View results at: https://app.hud.so/jobs/{job.id}")
```

View File

@@ -0,0 +1,130 @@
---
title: Custom Computers
slug: custom-computer-handlers
---
The Agent SDK supports defining custom computer handlers using a simple dictionary interface. This enables integration with custom automation backends, testing frameworks, or specialized computer control systems.
## Example: Defining a Custom Computer Handler
```python
import asyncio
from PIL import Image
# Define your custom computer functions
async def take_screenshot():
"""Your custom screenshot implementation"""
# Return PIL Image, bytes, or base64 string
return Image.new('RGB', (1920, 1080), color='white')
# Create dict-based computer handler - only 'screenshot' is required
custom_computer = {
'screenshot': take_screenshot, # required
# everything below is optional
'environment': 'linux', # linux, mac, windows, browser
'dimensions': (1920, 1080), # (width, height)
'click': lambda x, y, button: print(f"Clicking at ({x}, {y}) with {button} button"),
}
```
You can then use this as a tool for your agent:
```python
from agent import ComputerAgent
agent = ComputerAgent(
model="anthropic/claude-3-5-sonnet-20240620",
tools=[custom_computer],
)
# Agent will automatically convert dict to agent.computers.CustomComputerHandler
await agent.run("Take a screenshot and click at coordinates 100, 200")
```
## Class-Based Implementation
For more complex implementations, you can create a custom class by inheriting from `AsyncComputerHandler`:
```python
from agent.computers import AsyncComputerHandler
from PIL import Image
from typing import Literal, List, Dict, Union, Optional
class MyCustomComputer(AsyncComputerHandler):
"""Custom computer handler implementation."""
def __init__(self):
# Initialize your custom computer interface here
pass
# ==== Computer-Use-Preview Action Space ====
async def get_environment(self) -> Literal["windows", "mac", "linux", "browser"]:
"""Get the current environment type."""
...
async def get_dimensions(self) -> tuple[int, int]:
"""Get screen dimensions as (width, height)."""
...
async def screenshot(self) -> str:
"""Take a screenshot and return as base64 string."""
...
async def click(self, x: int, y: int, button: str = "left") -> None:
"""Click at coordinates with specified button."""
...
async def double_click(self, x: int, y: int) -> None:
"""Double click at coordinates."""
...
async def scroll(self, x: int, y: int, scroll_x: int, scroll_y: int) -> None:
"""Scroll at coordinates with specified scroll amounts."""
...
async def type(self, text: str) -> None:
"""Type text."""
...
async def wait(self, ms: int = 1000) -> None:
"""Wait for specified milliseconds."""
...
async def move(self, x: int, y: int) -> None:
"""Move cursor to coordinates."""
...
async def keypress(self, keys: Union[List[str], str]) -> None:
"""Press key combination."""
...
async def drag(self, path: List[Dict[str, int]]) -> None:
"""Drag along specified path."""
...
async def get_current_url(self) -> str:
"""Get current URL (for browser environments)."""
...
# ==== Anthropic Action Space ====
async def left_mouse_down(self, x: Optional[int] = None, y: Optional[int] = None) -> None:
"""Left mouse down at coordinates."""
...
async def left_mouse_up(self, x: Optional[int] = None, y: Optional[int] = None) -> None:
"""Left mouse up at coordinates."""
...
# Use with agent
custom_computer = MyCustomComputer()
agent = ComputerAgent(
model="anthropic/claude-3-5-sonnet-20240620",
tools=[custom_computer],
)
await agent.run("Take a screenshot and click at coordinates 100, 200")
```

View File

@@ -0,0 +1,49 @@
---
title: HUD Evals
description: Use ComputerAgent with HUD for benchmarking and evaluation
---
The HUD integration allows you to use ComputerAgent with the [HUD benchmarking framework](https://www.hud.so/), providing the same interface as existing HUD agents while leveraging ComputerAgent's capabilities.
## Installation
```bash
pip install "cua-agent[hud]"
## or install hud-python directly
# pip install hud-python==0.2.10
```
## Usage
```python
from agent.integrations.hud import run_job
from hud import load_taskset
from hud.taskset import TaskSet
import logging
# Load taskset
taskset = await load_taskset("OSWorld-Verified")
taskset = TaskSet(tasks=taskset[:10]) # limit to 10 tasks instead of all 370
# Run benchmark job
job = await run_job(
model="openai/computer-use-preview",
# model="anthropic/claude-3-5-sonnet-20241022",
# model="huggingface-local/HelloKKMe/GTA1-7B+openai/gpt-5",
task_or_taskset=taskset,
job_name="test-computeragent-job",
max_concurrent_tasks=5,
# add any extra ComputerAgent kwargs:
verbosity=logging.INFO, # Enable logging
# trajectory_dir=".." # Save trajectories locally
)
# Get results OR view them at app.hud.so
print(await job.get_analytics())
print(f"View results at: https://app.hud.so/jobs/{job.id}")
```
**Available Benchmarks:**
1. [OSWorld-Verified](/agent-sdk/benchmarks/osworld-verified) - Benchmark on OSWorld tasks
See the [HUD docs](https://docs.hud.so/environment-creation) for more eval environments.

View File

@@ -0,0 +1,4 @@
{
"title": "Integrations",
"pages": ["hud"]
}

View File

@@ -7,10 +7,12 @@
"chat-history",
"callbacks",
"sandboxed-tools",
"custom-computer-handlers",
"local-models",
"prompt-caching",
"usage-tracking",
"benchmarks",
"migration-guide"
"migration-guide",
"integrations"
]
}

View File

@@ -28,12 +28,26 @@ Any model that supports `predict_click()` can be used as the grounding component
Any vision-enabled LiteLLM-compatible model can be used as the thinking component:
- **Anthropic**: `anthropic/claude-3-5-sonnet-20241022`, `anthropic/claude-3-opus-20240229`
- **OpenAI**: `openai/gpt-4o`, `openai/gpt-4-vision-preview`
- **OpenAI**: `openai/gpt-5`, `openai/gpt-o3`, `openai/gpt-4o`
- **Google**: `gemini/gemini-1.5-pro`, `vertex_ai/gemini-pro-vision`
- **Local models**: Any Hugging Face vision-language model
## Usage Examples
### GTA1 + GPT-5
Use Google's Gemini for planning with specialized grounding:
```python
agent = ComputerAgent(
"huggingface-local/HelloKKMe/GTA1-7B+openai/gpt-5",
tools=[computer]
)
async for _ in agent.run("Take a screenshot, analyze the UI, and click on the most prominent button"):
pass
```
### GTA1 + Claude 3.5 Sonnet
Combine state-of-the-art grounding with powerful reasoning:
@@ -51,20 +65,6 @@ async for _ in agent.run("Open Firefox, navigate to github.com, and search for '
# - GTA1-7B provides precise click coordinates for each UI element
```
### GTA1 + Gemini Pro
Use Google's Gemini for planning with specialized grounding:
```python
agent = ComputerAgent(
"huggingface-local/HelloKKMe/GTA1-7B+gemini/gemini-1.5-pro",
tools=[computer]
)
async for _ in agent.run("Take a screenshot, analyze the UI, and click on the most prominent button"):
pass
```
### UI-TARS + GPT-4o
Combine two different vision models for enhanced capabilities:

View File

@@ -0,0 +1,66 @@
---
title: Human-In-The-Loop
description: Use humans as agents for evaluation, demonstrations, and interactive control
---
The Agent SDK provides a human tool, with native support for using a human-in-the-loop as a way to evaluate your environment, tools, or to create demonstrations. You can use it by doing `grounding_model+human/human` or `human/human` directly.
## Getting Started
To start the human agent tool, simply run:
```bash
python -m agent.human_tool
```
The UI will show you pending completions. Select a completion to take control of the agent.
## Usage Examples
### Direct Human Agent
```python
from agent import ComputerAgent
from agent.computer import computer
agent = ComputerAgent(
"human/human",
tools=[computer]
)
async for _ in agent.run("Take a screenshot, analyze the UI, and click on the most prominent button"):
pass
```
### Composed with Grounding Model
```python
agent = ComputerAgent(
"huggingface-local/HelloKKMe/GTA1-7B+human/human",
tools=[computer]
)
async for _ in agent.run("Navigate to the settings page and enable dark mode"):
pass
```
## Features
The human-in-the-loop interface provides:
- **Interactive UI**: Web-based interface for reviewing and responding to agent requests
- **Image Display**: Screenshots with click handlers for direct interaction
- **Action Accordions**: Support for various computer actions (click, type, keypress, etc.)
- **Tool Calls**: Full OpenAI-compatible tool call support
- **Real-time Updates**: Smart polling for responsive UI updates
## Use Cases
- **Evaluation**: Have humans evaluate agent performance and provide ground truth responses
- **Demonstrations**: Create training data by having humans demonstrate tasks
- **Interactive Control**: Take manual control when automated agents need human guidance
- **Testing**: Validate agent, tool, and environment behavior manually
---
For more details on the human tool implementation, see the [Human Tool Documentation](../../tools/human-tool).

View File

@@ -4,6 +4,7 @@
"pages": [
"computer-use-agents",
"grounding-models",
"composed-agents"
"composed-agents",
"human-in-the-loop"
]
}

View File

@@ -3,7 +3,9 @@ Adapters package for agent - Custom LLM adapters for LiteLLM
"""
from .huggingfacelocal_adapter import HuggingFaceLocalAdapter
from .human_adapter import HumanAdapter
__all__ = [
"HuggingFaceLocalAdapter",
"HumanAdapter",
]

View File

@@ -1,5 +1,7 @@
import asyncio
import functools
import warnings
from concurrent.futures import ThreadPoolExecutor
from typing import Iterator, AsyncIterator, Dict, List, Any, Optional
from litellm.types.utils import GenericStreamingChunk, ModelResponse
from litellm.llms.custom_llm import CustomLLM
@@ -28,6 +30,7 @@ class HuggingFaceLocalAdapter(CustomLLM):
self.device = device
self.models = {} # Cache for loaded models
self.processors = {} # Cache for loaded processors
self._executor = ThreadPoolExecutor(max_workers=1) # Single thread pool
def _load_model_and_processor(self, model_name: str):
"""Load model and processor if not already cached.
@@ -51,7 +54,8 @@ class HuggingFaceLocalAdapter(CustomLLM):
processor = AutoProcessor.from_pretrained(
model_name,
min_pixels=3136,
max_pixels=4096 * 2160
max_pixels=4096 * 2160,
device_map=self.device
)
# Cache them
@@ -185,7 +189,11 @@ class HuggingFaceLocalAdapter(CustomLLM):
ModelResponse with generated text
"""
# Run _generate in thread pool to avoid blocking
generated_text = await asyncio.to_thread(self._generate, **kwargs)
loop = asyncio.get_event_loop()
generated_text = await loop.run_in_executor(
self._executor,
functools.partial(self._generate, **kwargs)
)
return await acompletion(
model=f"huggingface-local/{kwargs['model']}",
@@ -218,7 +226,11 @@ class HuggingFaceLocalAdapter(CustomLLM):
AsyncIterator of GenericStreamingChunk
"""
# Run _generate in thread pool to avoid blocking
generated_text = await asyncio.to_thread(self._generate, **kwargs)
loop = asyncio.get_event_loop()
generated_text = await loop.run_in_executor(
self._executor,
functools.partial(self._generate, **kwargs)
)
generic_streaming_chunk: GenericStreamingChunk = {
"finish_reason": "stop",

View File

@@ -0,0 +1,348 @@
import os
import asyncio
import requests
from typing import List, Dict, Any, Iterator, AsyncIterator
from litellm.types.utils import GenericStreamingChunk, ModelResponse
from litellm.llms.custom_llm import CustomLLM
from litellm import completion, acompletion
class HumanAdapter(CustomLLM):
"""Human Adapter for human-in-the-loop completions.
This adapter sends completion requests to a human completion server
where humans can review and respond to AI requests.
"""
def __init__(self, base_url: str | None = None, timeout: float = 300.0, **kwargs):
"""Initialize the human adapter.
Args:
base_url: Base URL for the human completion server.
Defaults to HUMAN_BASE_URL environment variable or http://localhost:8002
timeout: Timeout in seconds for waiting for human response
**kwargs: Additional arguments
"""
super().__init__()
self.base_url = base_url or os.getenv('HUMAN_BASE_URL', 'http://localhost:8002')
self.timeout = timeout
# Ensure base_url doesn't end with slash
self.base_url = self.base_url.rstrip('/')
def _queue_completion(self, messages: List[Dict[str, Any]], model: str) -> str:
"""Queue a completion request and return the call ID.
Args:
messages: Messages in OpenAI format
model: Model name
Returns:
Call ID for tracking the request
Raises:
Exception: If queueing fails
"""
try:
response = requests.post(
f"{self.base_url}/queue",
json={"messages": messages, "model": model},
timeout=10
)
response.raise_for_status()
return response.json()["id"]
except requests.RequestException as e:
raise Exception(f"Failed to queue completion request: {e}")
def _wait_for_completion(self, call_id: str) -> Dict[str, Any]:
"""Wait for human to complete the call.
Args:
call_id: ID of the queued completion call
Returns:
Dict containing response and/or tool_calls
Raises:
TimeoutError: If timeout is exceeded
Exception: If completion fails
"""
import time
start_time = time.time()
while True:
try:
# Check status
status_response = requests.get(f"{self.base_url}/status/{call_id}")
status_response.raise_for_status()
status_data = status_response.json()
if status_data["status"] == "completed":
result = {}
if "response" in status_data and status_data["response"]:
result["response"] = status_data["response"]
if "tool_calls" in status_data and status_data["tool_calls"]:
result["tool_calls"] = status_data["tool_calls"]
return result
elif status_data["status"] == "failed":
error_msg = status_data.get("error", "Unknown error")
raise Exception(f"Completion failed: {error_msg}")
# Check timeout
if time.time() - start_time > self.timeout:
raise TimeoutError(f"Timeout waiting for human response after {self.timeout} seconds")
# Wait before checking again
time.sleep(1.0)
except requests.RequestException as e:
if time.time() - start_time > self.timeout:
raise TimeoutError(f"Timeout waiting for human response: {e}")
# Continue trying if we haven't timed out
time.sleep(1.0)
async def _async_wait_for_completion(self, call_id: str) -> Dict[str, Any]:
"""Async version of wait_for_completion.
Args:
call_id: ID of the queued completion call
Returns:
Dict containing response and/or tool_calls
Raises:
TimeoutError: If timeout is exceeded
Exception: If completion fails
"""
import aiohttp
import time
start_time = time.time()
async with aiohttp.ClientSession() as session:
while True:
try:
# Check status
async with session.get(f"{self.base_url}/status/{call_id}") as response:
response.raise_for_status()
status_data = await response.json()
if status_data["status"] == "completed":
result = {}
if "response" in status_data and status_data["response"]:
result["response"] = status_data["response"]
if "tool_calls" in status_data and status_data["tool_calls"]:
result["tool_calls"] = status_data["tool_calls"]
return result
elif status_data["status"] == "failed":
error_msg = status_data.get("error", "Unknown error")
raise Exception(f"Completion failed: {error_msg}")
# Check timeout
if time.time() - start_time > self.timeout:
raise TimeoutError(f"Timeout waiting for human response after {self.timeout} seconds")
# Wait before checking again
await asyncio.sleep(1.0)
except Exception as e:
if time.time() - start_time > self.timeout:
raise TimeoutError(f"Timeout waiting for human response: {e}")
# Continue trying if we haven't timed out
await asyncio.sleep(1.0)
def _generate_response(self, messages: List[Dict[str, Any]], model: str) -> Dict[str, Any]:
"""Generate a human response for the given messages.
Args:
messages: Messages in OpenAI format
model: Model name
Returns:
Dict containing response and/or tool_calls
"""
# Queue the completion request
call_id = self._queue_completion(messages, model)
# Wait for human response
response = self._wait_for_completion(call_id)
return response
async def _async_generate_response(self, messages: List[Dict[str, Any]], model: str) -> Dict[str, Any]:
"""Async version of _generate_response.
Args:
messages: Messages in OpenAI format
model: Model name
Returns:
Dict containing response and/or tool_calls
"""
# Queue the completion request (sync operation)
call_id = self._queue_completion(messages, model)
# Wait for human response (async)
response = await self._async_wait_for_completion(call_id)
return response
def completion(self, *args, **kwargs) -> ModelResponse:
"""Synchronous completion method.
Returns:
ModelResponse with human-generated text or tool calls
"""
messages = kwargs.get('messages', [])
model = kwargs.get('model', 'human')
# Generate human response
human_response_data = self._generate_response(messages, model)
# Create ModelResponse with proper structure
from litellm.types.utils import ModelResponse, Choices, Message
import uuid
import time
# Create message content based on response type
if "tool_calls" in human_response_data and human_response_data["tool_calls"]:
# Tool calls response
message = Message(
role="assistant",
content=human_response_data.get("response", ""),
tool_calls=human_response_data["tool_calls"]
)
else:
# Text response
message = Message(
role="assistant",
content=human_response_data.get("response", "")
)
choice = Choices(
finish_reason="stop",
index=0,
message=message
)
result = ModelResponse(
id=f"human-{uuid.uuid4()}",
choices=[choice],
created=int(time.time()),
model=f"human/{model}",
object="chat.completion"
)
return result
async def acompletion(self, *args, **kwargs) -> ModelResponse:
"""Asynchronous completion method.
Returns:
ModelResponse with human-generated text or tool calls
"""
messages = kwargs.get('messages', [])
model = kwargs.get('model', 'human')
# Generate human response
human_response_data = await self._async_generate_response(messages, model)
# Create ModelResponse with proper structure
from litellm.types.utils import ModelResponse, Choices, Message
import uuid
import time
# Create message content based on response type
if "tool_calls" in human_response_data and human_response_data["tool_calls"]:
# Tool calls response
message = Message(
role="assistant",
content=human_response_data.get("response", ""),
tool_calls=human_response_data["tool_calls"]
)
else:
# Text response
message = Message(
role="assistant",
content=human_response_data.get("response", "")
)
choice = Choices(
finish_reason="stop",
index=0,
message=message
)
result = ModelResponse(
id=f"human-{uuid.uuid4()}",
choices=[choice],
created=int(time.time()),
model=f"human/{model}",
object="chat.completion"
)
return result
def streaming(self, *args, **kwargs) -> Iterator[GenericStreamingChunk]:
"""Synchronous streaming method.
Yields:
Streaming chunks with human-generated text or tool calls
"""
messages = kwargs.get('messages', [])
model = kwargs.get('model', 'human')
# Generate human response
human_response_data = self._generate_response(messages, model)
import time
# Handle tool calls vs text response
if "tool_calls" in human_response_data and human_response_data["tool_calls"]:
# Stream tool calls as a single chunk
generic_chunk: GenericStreamingChunk = {
"finish_reason": "tool_calls",
"index": 0,
"is_finished": True,
"text": human_response_data.get("response", ""),
"tool_use": human_response_data["tool_calls"],
"usage": {"completion_tokens": 1, "prompt_tokens": 0, "total_tokens": 1},
}
yield generic_chunk
else:
# Stream text response
response_text = human_response_data.get("response", "")
generic_chunk: GenericStreamingChunk = {
"finish_reason": "stop",
"index": 0,
"is_finished": True,
"text": response_text,
"tool_use": None,
"usage": {"completion_tokens": len(response_text.split()), "prompt_tokens": 0, "total_tokens": len(response_text.split())},
}
yield generic_chunk
async def astreaming(self, *args, **kwargs) -> AsyncIterator[GenericStreamingChunk]:
"""Asynchronous streaming method.
Yields:
Streaming chunks with human-generated text or tool calls
"""
messages = kwargs.get('messages', [])
model = kwargs.get('model', 'human')
# Generate human response
human_response = await self._async_generate_response(messages, model)
# Return as single streaming chunk
generic_streaming_chunk: GenericStreamingChunk = {
"finish_reason": "stop",
"index": 0,
"is_finished": True,
"text": human_response,
"tool_use": None,
"usage": {"completion_tokens": len(human_response.split()), "prompt_tokens": 0, "total_tokens": len(human_response.split())},
}
yield generic_streaming_chunk

View File

@@ -7,14 +7,16 @@ from typing import Dict, List, Any, Optional, AsyncGenerator, Union, cast, Calla
from litellm.responses.utils import Usage
from .types import Messages, Computer, AgentCapability
from .types import Messages, AgentCapability
from .decorators import find_agent_config
from .computer_handler import OpenAIComputerHandler, acknowledge_safety_check_callback, check_blocklisted_url
import json
import litellm
import litellm.utils
import inspect
from .adapters import HuggingFaceLocalAdapter
from .adapters import (
HuggingFaceLocalAdapter,
HumanAdapter,
)
from .callbacks import (
ImageRetentionCallback,
LoggingCallback,
@@ -22,9 +24,14 @@ from .callbacks import (
BudgetManagerCallback,
TelemetryCallback,
)
from .computers import (
AsyncComputerHandler,
is_agent_computer,
make_computer_handler
)
def get_json(obj: Any, max_depth: int = 10) -> Any:
def custom_serializer(o: Any, depth: int = 0, seen: Set[int] = None) -> Any:
def custom_serializer(o: Any, depth: int = 0, seen: Optional[Set[int]] = None) -> Any:
if seen is None:
seen = set()
@@ -211,8 +218,10 @@ class ComputerAgent:
hf_adapter = HuggingFaceLocalAdapter(
device="auto"
)
human_adapter = HumanAdapter()
litellm.custom_provider_map = [
{"provider": "huggingface-local", "custom_handler": hf_adapter}
{"provider": "huggingface-local", "custom_handler": hf_adapter},
{"provider": "human", "custom_handler": human_adapter}
]
litellm.suppress_debug_info = True
@@ -236,10 +245,6 @@ class ComputerAgent:
async def _initialize_computers(self):
"""Initialize computer objects"""
if not self.tool_schemas:
for tool in self.tools:
if hasattr(tool, '_initialized') and not tool._initialized:
await tool.run()
# Process tools and create tool schemas
self.tool_schemas = self._process_tools()
@@ -247,7 +252,7 @@ class ComputerAgent:
computer_handler = None
for schema in self.tool_schemas:
if schema["type"] == "computer":
computer_handler = OpenAIComputerHandler(schema["computer"].interface)
computer_handler = await make_computer_handler(schema["computer"])
break
self.computer_handler = computer_handler
@@ -263,7 +268,7 @@ class ComputerAgent:
for tool in self.tools:
# Check if it's a computer object (has interface attribute)
if hasattr(tool, 'interface'):
if is_agent_computer(tool):
# This is a computer tool - will be handled by agent loop
schemas.append({
"type": "computer",
@@ -398,7 +403,7 @@ class ComputerAgent:
# AGENT OUTPUT PROCESSING
# ============================================================================
async def _handle_item(self, item: Any, computer: Optional[Computer] = None, ignore_call_ids: Optional[List[str]] = None) -> List[Dict[str, Any]]:
async def _handle_item(self, item: Any, computer: Optional[AsyncComputerHandler] = None, ignore_call_ids: Optional[List[str]] = None) -> List[Dict[str, Any]]:
"""Handle each item; may cause a computer action + screenshot."""
if ignore_call_ids and item.get("call_id") and item.get("call_id") in ignore_call_ids:
return []
@@ -450,10 +455,12 @@ class ComputerAgent:
acknowledged_checks = []
for check in pending_checks:
check_message = check.get("message", str(check))
if acknowledge_safety_check_callback(check_message, allow_always=True): # TODO: implement a callback for safety checks
acknowledged_checks.append(check)
else:
raise ValueError(f"Safety check failed: {check_message}")
acknowledged_checks.append(check)
# TODO: implement a callback for safety checks
# if acknowledge_safety_check_callback(check_message, allow_always=True):
# acknowledged_checks.append(check)
# else:
# raise ValueError(f"Safety check failed: {check_message}")
# Create call output
call_output = {
@@ -466,11 +473,12 @@ class ComputerAgent:
},
}
# Additional URL safety checks for browser environments
if await computer.get_environment() == "browser":
current_url = await computer.get_current_url()
call_output["output"]["current_url"] = current_url
check_blocklisted_url(current_url)
# # Additional URL safety checks for browser environments
# if await computer.get_environment() == "browser":
# current_url = await computer.get_current_url()
# call_output["output"]["current_url"] = current_url
# # TODO: implement a callback for URL safety checks
# # check_blocklisted_url(current_url)
result = [call_output]
await self._on_computer_call_end(item, result)

View File

@@ -51,12 +51,14 @@ class TrajectorySaverCallback(AsyncCallbackHandler):
within the trajectory gets its own folder with screenshots and responses.
"""
def __init__(self, trajectory_dir: str):
def __init__(self, trajectory_dir: str, reset_on_run: bool = True):
"""
Initialize trajectory saver.
Args:
trajectory_dir: Base directory to save trajectories
reset_on_run: If True, reset trajectory_id/turn/artifact on each run.
If False, continue using existing trajectory_id if set.
"""
self.trajectory_dir = Path(trajectory_dir)
self.trajectory_id: Optional[str] = None
@@ -64,6 +66,7 @@ class TrajectorySaverCallback(AsyncCallbackHandler):
self.current_artifact: int = 0
self.model: Optional[str] = None
self.total_usage: Dict[str, Any] = {}
self.reset_on_run = reset_on_run
# Ensure trajectory directory exists
self.trajectory_dir.mkdir(parents=True, exist_ok=True)
@@ -113,32 +116,38 @@ class TrajectorySaverCallback(AsyncCallbackHandler):
async def on_run_start(self, kwargs: Dict[str, Any], old_items: List[Dict[str, Any]]) -> None:
"""Initialize trajectory tracking for a new run."""
model = kwargs.get("model", "unknown")
model_name_short = model.split("+")[-1].split("/")[-1].lower()[:16]
if "+" in model:
model_name_short = model.split("+")[0].lower()[:4] + "_" + model_name_short
# Only reset trajectory state if reset_on_run is True or no trajectory exists
if self.reset_on_run or not self.trajectory_id:
model_name_short = model.split("+")[-1].split("/")[-1].lower()[:16]
if "+" in model:
model_name_short = model.split("+")[0].lower()[:4] + "_" + model_name_short
# id format: yyyy-mm-dd_model_hhmmss_uuid[:4]
now = datetime.now()
self.trajectory_id = f"{now.strftime('%Y-%m-%d')}_{model_name_short}_{now.strftime('%H%M%S')}_{str(uuid.uuid4())[:4]}"
self.current_turn = 0
self.current_artifact = 0
self.model = model
self.total_usage = {}
# Create trajectory directory
trajectory_path = self.trajectory_dir / self.trajectory_id
trajectory_path.mkdir(parents=True, exist_ok=True)
# Save trajectory metadata
metadata = {
"trajectory_id": self.trajectory_id,
"created_at": str(uuid.uuid1().time),
"status": "running",
"kwargs": kwargs,
}
with open(trajectory_path / "metadata.json", "w") as f:
json.dump(metadata, f, indent=2)
# id format: yyyy-mm-dd_model_hhmmss_uuid[:4]
now = datetime.now()
self.trajectory_id = f"{now.strftime('%Y-%m-%d')}_{model_name_short}_{now.strftime('%H%M%S')}_{str(uuid.uuid4())[:4]}"
self.current_turn = 0
self.current_artifact = 0
self.model = model
self.total_usage = {}
# Create trajectory directory
trajectory_path = self.trajectory_dir / self.trajectory_id
trajectory_path.mkdir(parents=True, exist_ok=True)
# Save trajectory metadata
metadata = {
"trajectory_id": self.trajectory_id,
"created_at": str(uuid.uuid1().time),
"status": "running",
"kwargs": kwargs,
}
with open(trajectory_path / "metadata.json", "w") as f:
json.dump(metadata, f, indent=2)
else:
# Continue with existing trajectory - just update model if needed
self.model = model
@override
async def on_run_end(self, kwargs: Dict[str, Any], old_items: List[Dict[str, Any]], new_items: List[Dict[str, Any]]) -> None:

View File

@@ -0,0 +1,41 @@
"""
Computer handler factory and interface definitions.
This module provides a factory function to create computer handlers from different
computer interface types, supporting both the ComputerHandler protocol and the
Computer library interface.
"""
from .base import AsyncComputerHandler
from .cua import cuaComputerHandler
from .custom import CustomComputerHandler
from computer import Computer as cuaComputer
def is_agent_computer(computer):
"""Check if the given computer is a ComputerHandler or CUA Computer."""
return isinstance(computer, AsyncComputerHandler) or \
isinstance(computer, cuaComputer) or \
(isinstance(computer, dict)) #and "screenshot" in computer)
async def make_computer_handler(computer):
"""
Create a computer handler from a computer interface.
Args:
computer: Either a ComputerHandler instance, Computer instance, or dict of functions
Returns:
ComputerHandler: A computer handler instance
Raises:
ValueError: If the computer type is not supported
"""
if isinstance(computer, AsyncComputerHandler):
return computer
if isinstance(computer, cuaComputer):
computer_handler = cuaComputerHandler(computer)
await computer_handler._initialize()
return computer_handler
if isinstance(computer, dict):
return CustomComputerHandler(computer)
raise ValueError(f"Unsupported computer type: {type(computer)}")

View File

@@ -0,0 +1,70 @@
"""
Base computer interface protocol for agent interactions.
"""
from typing import Protocol, Literal, List, Dict, Any, Union, Optional, runtime_checkable
@runtime_checkable
class AsyncComputerHandler(Protocol):
"""Protocol defining the interface for computer interactions."""
# ==== Computer-Use-Preview Action Space ====
async def get_environment(self) -> Literal["windows", "mac", "linux", "browser"]:
"""Get the current environment type."""
...
async def get_dimensions(self) -> tuple[int, int]:
"""Get screen dimensions as (width, height)."""
...
async def screenshot(self) -> str:
"""Take a screenshot and return as base64 string."""
...
async def click(self, x: int, y: int, button: str = "left") -> None:
"""Click at coordinates with specified button."""
...
async def double_click(self, x: int, y: int) -> None:
"""Double click at coordinates."""
...
async def scroll(self, x: int, y: int, scroll_x: int, scroll_y: int) -> None:
"""Scroll at coordinates with specified scroll amounts."""
...
async def type(self, text: str) -> None:
"""Type text."""
...
async def wait(self, ms: int = 1000) -> None:
"""Wait for specified milliseconds."""
...
async def move(self, x: int, y: int) -> None:
"""Move cursor to coordinates."""
...
async def keypress(self, keys: Union[List[str], str]) -> None:
"""Press key combination."""
...
async def drag(self, path: List[Dict[str, int]]) -> None:
"""Drag along specified path."""
...
async def get_current_url(self) -> str:
"""Get current URL (for browser environments)."""
...
# ==== Anthropic Action Space ====
async def left_mouse_down(self, x: Optional[int] = None, y: Optional[int] = None) -> None:
"""Left mouse down at coordinates."""
...
async def left_mouse_up(self, x: Optional[int] = None, y: Optional[int] = None) -> None:
"""Left mouse up at coordinates."""
...

View File

@@ -4,35 +4,44 @@ Computer handler implementation for OpenAI computer-use-preview protocol.
import base64
from typing import Dict, List, Any, Literal, Union, Optional
from .types import Computer
from .base import AsyncComputerHandler
from computer import Computer
class OpenAIComputerHandler:
class cuaComputerHandler(AsyncComputerHandler):
"""Computer handler that implements the Computer protocol using the computer interface."""
def __init__(self, computer_interface):
def __init__(self, cua_computer: Computer):
"""Initialize with a computer interface (from tool schema)."""
self.interface = computer_interface
self.cua_computer = cua_computer
self.interface = None
async def _initialize(self):
if hasattr(self.cua_computer, '_initialized') and not self.cua_computer._initialized:
await self.cua_computer.run()
self.interface = self.cua_computer.interface
# ==== Computer-Use-Preview Action Space ====
async def get_environment(self) -> Literal["windows", "mac", "linux", "browser"]:
"""Get the current environment type."""
# For now, return a default - this could be enhanced to detect actual environment
return "windows"
# TODO: detect actual environment
return "linux"
async def get_dimensions(self) -> tuple[int, int]:
"""Get screen dimensions as (width, height)."""
assert self.interface is not None
screen_size = await self.interface.get_screen_size()
return screen_size["width"], screen_size["height"]
async def screenshot(self) -> str:
"""Take a screenshot and return as base64 string."""
assert self.interface is not None
screenshot_bytes = await self.interface.screenshot()
return base64.b64encode(screenshot_bytes).decode('utf-8')
async def click(self, x: int, y: int, button: str = "left") -> None:
"""Click at coordinates with specified button."""
assert self.interface is not None
if button == "left":
await self.interface.left_click(x, y)
elif button == "right":
@@ -43,28 +52,34 @@ class OpenAIComputerHandler:
async def double_click(self, x: int, y: int) -> None:
"""Double click at coordinates."""
assert self.interface is not None
await self.interface.double_click(x, y)
async def scroll(self, x: int, y: int, scroll_x: int, scroll_y: int) -> None:
"""Scroll at coordinates with specified scroll amounts."""
assert self.interface is not None
await self.interface.move_cursor(x, y)
await self.interface.scroll(scroll_x, scroll_y)
async def type(self, text: str) -> None:
"""Type text."""
assert self.interface is not None
await self.interface.type_text(text)
async def wait(self, ms: int = 1000) -> None:
"""Wait for specified milliseconds."""
assert self.interface is not None
import asyncio
await asyncio.sleep(ms / 1000.0)
async def move(self, x: int, y: int) -> None:
"""Move cursor to coordinates."""
assert self.interface is not None
await self.interface.move_cursor(x, y)
async def keypress(self, keys: Union[List[str], str]) -> None:
"""Press key combination."""
assert self.interface is not None
if isinstance(keys, str):
keys = keys.replace("-", "+").split("+")
if len(keys) == 1:
@@ -75,6 +90,7 @@ class OpenAIComputerHandler:
async def drag(self, path: List[Dict[str, int]]) -> None:
"""Drag along specified path."""
assert self.interface is not None
if not path:
return
@@ -99,23 +115,10 @@ class OpenAIComputerHandler:
# ==== Anthropic Computer Action Space ====
async def left_mouse_down(self, x: Optional[int] = None, y: Optional[int] = None) -> None:
"""Left mouse down at coordinates."""
assert self.interface is not None
await self.interface.mouse_down(x, y, button="left")
async def left_mouse_up(self, x: Optional[int] = None, y: Optional[int] = None) -> None:
"""Left mouse up at coordinates."""
await self.interface.mouse_up(x, y, button="left")
def acknowledge_safety_check_callback(message: str, allow_always: bool = False) -> bool:
"""Safety check callback for user acknowledgment."""
if allow_always:
return True
response = input(
f"Safety Check Warning: {message}\nDo you want to acknowledge and proceed? (y/n): "
).lower()
return response.strip() == "y"
def check_blocklisted_url(url: str) -> None:
"""Check if URL is blocklisted (placeholder implementation)."""
# This would contain actual URL checking logic
pass
assert self.interface is not None
await self.interface.mouse_up(x, y, button="left")

View File

@@ -0,0 +1,209 @@
"""
Custom computer handler implementation that accepts a dictionary of functions.
"""
import base64
from typing import Dict, List, Any, Literal, Union, Optional, Callable
from PIL import Image
import io
from .base import AsyncComputerHandler
class CustomComputerHandler(AsyncComputerHandler):
"""Computer handler that implements the Computer protocol using a dictionary of custom functions."""
def __init__(self, functions: Dict[str, Callable]):
"""
Initialize with a dictionary of functions.
Args:
functions: Dictionary where keys are method names and values are callable functions.
Only 'screenshot' is required, all others are optional.
Raises:
ValueError: If required 'screenshot' function is not provided.
"""
if 'screenshot' not in functions:
raise ValueError("'screenshot' function is required in functions dictionary")
self.functions = functions
self._last_screenshot_size: Optional[tuple[int, int]] = None
async def _call_function(self, func, *args, **kwargs):
"""
Call a function, handling both async and sync functions.
Args:
func: The function to call
*args: Positional arguments to pass to the function
**kwargs: Keyword arguments to pass to the function
Returns:
The result of the function call
"""
import asyncio
import inspect
if callable(func):
if inspect.iscoroutinefunction(func):
return await func(*args, **kwargs)
else:
return func(*args, **kwargs)
else:
return func
async def _get_value(self, attribute: str):
"""
Get value for an attribute, checking both 'get_{attribute}' and '{attribute}' keys.
Args:
attribute: The attribute name to look for
Returns:
The value from the functions dict, called if callable, returned directly if not
"""
# Check for 'get_{attribute}' first
get_key = f"get_{attribute}"
if get_key in self.functions:
return await self._call_function(self.functions[get_key])
# Check for '{attribute}'
if attribute in self.functions:
return await self._call_function(self.functions[attribute])
return None
def _to_b64_str(self, img: Union[bytes, Image.Image, str]) -> str:
"""
Convert image to base64 string.
Args:
img: Image as bytes, PIL Image, or base64 string
Returns:
str: Base64 encoded image string
"""
if isinstance(img, str):
# Already a base64 string
return img
elif isinstance(img, bytes):
# Raw bytes
return base64.b64encode(img).decode('utf-8')
elif isinstance(img, Image.Image):
# PIL Image
buffer = io.BytesIO()
img.save(buffer, format='PNG')
return base64.b64encode(buffer.getvalue()).decode('utf-8')
else:
raise ValueError(f"Unsupported image type: {type(img)}")
# ==== Computer-Use-Preview Action Space ====
async def get_environment(self) -> Literal["windows", "mac", "linux", "browser"]:
"""Get the current environment type."""
result = await self._get_value('environment')
if result is None:
return "linux"
assert result in ["windows", "mac", "linux", "browser"]
return result # type: ignore
async def get_dimensions(self) -> tuple[int, int]:
"""Get screen dimensions as (width, height)."""
result = await self._get_value('dimensions')
if result is not None:
return result # type: ignore
# Fallback: use last screenshot size if available
if not self._last_screenshot_size:
await self.screenshot()
assert self._last_screenshot_size is not None, "Failed to get screenshot size"
return self._last_screenshot_size
async def screenshot(self) -> str:
"""Take a screenshot and return as base64 string."""
result = await self._call_function(self.functions['screenshot'])
b64_str = self._to_b64_str(result) # type: ignore
# Try to extract dimensions for fallback use
try:
if isinstance(result, Image.Image):
self._last_screenshot_size = result.size
elif isinstance(result, bytes):
# Try to decode bytes to get dimensions
img = Image.open(io.BytesIO(result))
self._last_screenshot_size = img.size
except Exception:
# If we can't get dimensions, that's okay
pass
return b64_str
async def click(self, x: int, y: int, button: str = "left") -> None:
"""Click at coordinates with specified button."""
if 'click' in self.functions:
await self._call_function(self.functions['click'], x, y, button)
# No-op if not implemented
async def double_click(self, x: int, y: int) -> None:
"""Double click at coordinates."""
if 'double_click' in self.functions:
await self._call_function(self.functions['double_click'], x, y)
# No-op if not implemented
async def scroll(self, x: int, y: int, scroll_x: int, scroll_y: int) -> None:
"""Scroll at coordinates with specified scroll amounts."""
if 'scroll' in self.functions:
await self._call_function(self.functions['scroll'], x, y, scroll_x, scroll_y)
# No-op if not implemented
async def type(self, text: str) -> None:
"""Type text."""
if 'type' in self.functions:
await self._call_function(self.functions['type'], text)
# No-op if not implemented
async def wait(self, ms: int = 1000) -> None:
"""Wait for specified milliseconds."""
if 'wait' in self.functions:
await self._call_function(self.functions['wait'], ms)
else:
# Default implementation
import asyncio
await asyncio.sleep(ms / 1000.0)
async def move(self, x: int, y: int) -> None:
"""Move cursor to coordinates."""
if 'move' in self.functions:
await self._call_function(self.functions['move'], x, y)
# No-op if not implemented
async def keypress(self, keys: Union[List[str], str]) -> None:
"""Press key combination."""
if 'keypress' in self.functions:
await self._call_function(self.functions['keypress'], keys)
# No-op if not implemented
async def drag(self, path: List[Dict[str, int]]) -> None:
"""Drag along specified path."""
if 'drag' in self.functions:
await self._call_function(self.functions['drag'], path)
# No-op if not implemented
async def get_current_url(self) -> str:
"""Get current URL (for browser environments)."""
if 'get_current_url' in self.functions:
return await self._get_value('current_url') # type: ignore
return "" # Default fallback
async def left_mouse_down(self, x: Optional[int] = None, y: Optional[int] = None) -> None:
"""Left mouse down at coordinates."""
if 'left_mouse_down' in self.functions:
await self._call_function(self.functions['left_mouse_down'], x, y)
# No-op if not implemented
async def left_mouse_up(self, x: Optional[int] = None, y: Optional[int] = None) -> None:
"""Left mouse up at coordinates."""
if 'left_mouse_up' in self.functions:
await self._call_function(self.functions['left_mouse_up'], x, y)
# No-op if not implemented

View File

@@ -0,0 +1,29 @@
"""
Human-in-the-Loop Completion Tool
This package provides a human-in-the-loop completion system that allows
AI agents to request human assistance for complex decisions or responses.
Components:
- server.py: FastAPI server with completion queue management
- ui.py: Gradio UI for human interaction
- __main__.py: Combined server and UI application
Usage:
# Run the server and UI
python -m agent.human_tool
# Or run components separately
python -m agent.human_tool.server # API server only
python -m agent.human_tool.ui # UI only
"""
from .server import CompletionQueue, completion_queue
from .ui import HumanCompletionUI, create_ui
__all__ = [
"CompletionQueue",
"completion_queue",
"HumanCompletionUI",
"create_ui"
]

View File

@@ -0,0 +1,38 @@
#!/usr/bin/env python3
"""
Human-in-the-Loop Completion Server and UI
This module combines the FastAPI server for handling completion requests
with a Gradio UI for human interaction.
"""
import gradio as gr
from fastapi import FastAPI
from .server import app as fastapi_app
from .ui import create_ui
# Create the Gradio demo
gradio_demo = create_ui()
# Mount Gradio on FastAPI
CUSTOM_PATH = "/gradio"
app = gr.mount_gradio_app(fastapi_app, gradio_demo, path=CUSTOM_PATH)
# Add a redirect from root to Gradio UI
@fastapi_app.get("/")
async def redirect_to_ui():
"""Redirect root to Gradio UI."""
return {
"message": "Human Completion Server is running",
"ui_url": "/gradio",
"api_docs": "/docs"
}
if __name__ == "__main__":
import uvicorn
print("🚀 Starting Human-in-the-Loop Completion Server...")
print("📊 API Server: http://localhost:8002")
print("🎨 Gradio UI: http://localhost:8002/gradio")
print("📚 API Docs: http://localhost:8002/docs")
uvicorn.run(app, host="0.0.0.0", port=8002)

View File

@@ -0,0 +1,234 @@
import asyncio
import uuid
from datetime import datetime
from typing import Dict, List, Any, Optional
from dataclasses import dataclass, asdict
from enum import Enum
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
class CompletionStatus(str, Enum):
PENDING = "pending"
COMPLETED = "completed"
FAILED = "failed"
@dataclass
class CompletionCall:
id: str
messages: List[Dict[str, Any]]
model: str
status: CompletionStatus
created_at: datetime
completed_at: Optional[datetime] = None
response: Optional[str] = None
tool_calls: Optional[List[Dict[str, Any]]] = None
error: Optional[str] = None
class ToolCall(BaseModel):
id: str
type: str = "function"
function: Dict[str, Any]
class CompletionRequest(BaseModel):
messages: List[Dict[str, Any]]
model: str
class CompletionResponse(BaseModel):
response: Optional[str] = None
tool_calls: Optional[List[Dict[str, Any]]] = None
class CompletionQueue:
def __init__(self):
self._queue: Dict[str, CompletionCall] = {}
self._pending_order: List[str] = []
self._lock = asyncio.Lock()
async def add_completion(self, messages: List[Dict[str, Any]], model: str) -> str:
"""Add a completion call to the queue."""
async with self._lock:
call_id = str(uuid.uuid4())
completion_call = CompletionCall(
id=call_id,
messages=messages,
model=model,
status=CompletionStatus.PENDING,
created_at=datetime.now()
)
self._queue[call_id] = completion_call
self._pending_order.append(call_id)
return call_id
async def get_pending_calls(self) -> List[Dict[str, Any]]:
"""Get all pending completion calls."""
async with self._lock:
pending_calls = []
for call_id in self._pending_order:
if call_id in self._queue and self._queue[call_id].status == CompletionStatus.PENDING:
call = self._queue[call_id]
pending_calls.append({
"id": call.id,
"model": call.model,
"created_at": call.created_at.isoformat(),
"messages": call.messages
})
return pending_calls
async def get_call_status(self, call_id: str) -> Optional[Dict[str, Any]]:
"""Get the status of a specific completion call."""
async with self._lock:
if call_id not in self._queue:
return None
call = self._queue[call_id]
result = {
"id": call.id,
"status": call.status.value,
"created_at": call.created_at.isoformat(),
"model": call.model,
"messages": call.messages
}
if call.completed_at:
result["completed_at"] = call.completed_at.isoformat()
if call.response:
result["response"] = call.response
if call.tool_calls:
result["tool_calls"] = call.tool_calls
if call.error:
result["error"] = call.error
return result
async def complete_call(self, call_id: str, response: Optional[str] = None, tool_calls: Optional[List[Dict[str, Any]]] = None) -> bool:
"""Mark a completion call as completed with a response or tool calls."""
async with self._lock:
if call_id not in self._queue:
return False
call = self._queue[call_id]
if call.status != CompletionStatus.PENDING:
return False
call.status = CompletionStatus.COMPLETED
call.completed_at = datetime.now()
call.response = response
call.tool_calls = tool_calls
# Remove from pending order
if call_id in self._pending_order:
self._pending_order.remove(call_id)
return True
async def fail_call(self, call_id: str, error: str) -> bool:
"""Mark a completion call as failed with an error."""
async with self._lock:
if call_id not in self._queue:
return False
call = self._queue[call_id]
if call.status != CompletionStatus.PENDING:
return False
call.status = CompletionStatus.FAILED
call.completed_at = datetime.now()
call.error = error
# Remove from pending order
if call_id in self._pending_order:
self._pending_order.remove(call_id)
return True
async def wait_for_completion(self, call_id: str, timeout: float = 300.0) -> Optional[str]:
"""Wait for a completion call to be completed and return the response."""
start_time = asyncio.get_event_loop().time()
while True:
status = await self.get_call_status(call_id)
if not status:
return None
if status["status"] == CompletionStatus.COMPLETED.value:
return status.get("response")
elif status["status"] == CompletionStatus.FAILED.value:
raise Exception(f"Completion failed: {status.get('error', 'Unknown error')}")
# Check timeout
if asyncio.get_event_loop().time() - start_time > timeout:
await self.fail_call(call_id, "Timeout waiting for human response")
raise TimeoutError("Timeout waiting for human response")
# Wait a bit before checking again
await asyncio.sleep(0.5)
# Global queue instance
completion_queue = CompletionQueue()
# FastAPI app
app = FastAPI(title="Human Completion Server", version="1.0.0")
@app.post("/queue", response_model=Dict[str, str])
async def queue_completion(request: CompletionRequest):
"""Add a completion request to the queue."""
call_id = await completion_queue.add_completion(request.messages, request.model)
return {"id": call_id, "status": "queued"}
@app.get("/pending")
async def list_pending():
"""List all pending completion calls."""
pending_calls = await completion_queue.get_pending_calls()
return {"pending_calls": pending_calls}
@app.get("/status/{call_id}")
async def get_status(call_id: str):
"""Get the status of a specific completion call."""
status = await completion_queue.get_call_status(call_id)
if not status:
raise HTTPException(status_code=404, detail="Completion call not found")
return status
@app.post("/complete/{call_id}")
async def complete_call(call_id: str, response: CompletionResponse):
"""Complete a call with a human response."""
success = await completion_queue.complete_call(
call_id,
response=response.response,
tool_calls=response.tool_calls
)
if success:
return {"status": "success", "message": "Call completed"}
else:
raise HTTPException(status_code=404, detail="Call not found or already completed")
@app.post("/fail/{call_id}")
async def fail_call(call_id: str, error: Dict[str, str]):
"""Mark a call as failed."""
success = await completion_queue.fail_call(call_id, error.get("error", "Unknown error"))
if not success:
raise HTTPException(status_code=404, detail="Completion call not found or already completed")
return {"status": "failed"}
@app.get("/")
async def root():
"""Root endpoint."""
return {"message": "Human Completion Server is running"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8002)

View File

@@ -0,0 +1,630 @@
import gradio as gr
import json
import time
from typing import List, Dict, Any, Optional
from datetime import datetime
import requests
from .server import completion_queue
import base64
import io
from PIL import Image
class HumanCompletionUI:
def __init__(self, server_url: str = "http://localhost:8002"):
self.server_url = server_url
self.current_call_id: Optional[str] = None
self.refresh_interval = 2.0 # seconds
self.last_image = None # Store the last image for display
def format_messages_for_chatbot(self, messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Format messages for display in gr.Chatbot with type='messages'."""
formatted = []
for msg in messages:
role = msg.get("role", "user")
content = msg.get("content", "")
tool_calls = msg.get("tool_calls", [])
# Handle different content formats
if isinstance(content, list):
# Multi-modal content - can include text and images
formatted_content = []
for item in content:
if item.get("type") == "text":
text = item.get("text", "")
if text.strip(): # Only add non-empty text
formatted_content.append(text)
elif item.get("type") == "image_url":
image_url = item.get("image_url", {}).get("url", "")
if image_url:
# Check if it's a base64 image or URL
if image_url.startswith("data:image"):
# For base64 images, decode and create gr.Image
try:
header, data = image_url.split(",", 1)
image_data = base64.b64decode(data)
image = Image.open(io.BytesIO(image_data))
formatted_content.append(gr.Image(value=image))
except Exception as e:
print(f"Error loading image: {e}")
formatted_content.append(f"[Image loading error: {e}]")
else:
# For URL images, create gr.Image with URL
formatted_content.append(gr.Image(value=image_url))
# Determine final content format
if len(formatted_content) == 1:
content = formatted_content[0]
elif len(formatted_content) > 1:
content = formatted_content
else:
content = "[Empty content]"
# Ensure role is valid for Gradio Chatbot
if role not in ["user", "assistant"]:
role = "assistant" if role == "system" else "user"
# Invert roles for better display in human UI context
# (what the AI says becomes "user", what human should respond becomes "assistant")
if role == "user":
role = "assistant"
else:
role = "user"
# Add the main message if it has content
if content and str(content).strip():
formatted.append({"role": role, "content": content})
# Handle tool calls - create separate messages for each tool call
if tool_calls:
for tool_call in tool_calls:
function_name = tool_call.get("function", {}).get("name", "unknown")
arguments_str = tool_call.get("function", {}).get("arguments", "{}")
try:
# Parse arguments to format them nicely
arguments = json.loads(arguments_str)
formatted_args = json.dumps(arguments, indent=2)
except json.JSONDecodeError:
# If parsing fails, use the raw string
formatted_args = arguments_str
# Create a formatted message for the tool call
tool_call_content = f"```json\n{formatted_args}\n```"
formatted.append({
"role": role,
"content": tool_call_content,
"metadata": {"title": f"🛠️ Used {function_name}"}
})
return formatted
def get_pending_calls(self) -> List[Dict[str, Any]]:
"""Get pending calls from the server."""
try:
response = requests.get(f"{self.server_url}/pending", timeout=5)
if response.status_code == 200:
return response.json().get("pending_calls", [])
except Exception as e:
print(f"Error fetching pending calls: {e}")
return []
def complete_call_with_response(self, call_id: str, response: str) -> bool:
"""Complete a call with a text response."""
try:
response_data = {"response": response}
response_obj = requests.post(
f"{self.server_url}/complete/{call_id}",
json=response_data,
timeout=10
)
response_obj.raise_for_status()
return True
except requests.RequestException as e:
print(f"Error completing call: {e}")
return False
def complete_call_with_tool_calls(self, call_id: str, tool_calls: List[Dict[str, Any]]) -> bool:
"""Complete a call with tool calls."""
try:
response_data = {"tool_calls": tool_calls}
response_obj = requests.post(
f"{self.server_url}/complete/{call_id}",
json=response_data,
timeout=10
)
response_obj.raise_for_status()
return True
except requests.RequestException as e:
print(f"Error completing call: {e}")
return False
def complete_call(self, call_id: str, response: Optional[str] = None, tool_calls: Optional[List[Dict[str, Any]]] = None) -> bool:
"""Complete a call with either a response or tool calls."""
try:
response_data = {}
if response:
response_data["response"] = response
if tool_calls:
response_data["tool_calls"] = tool_calls
response_obj = requests.post(
f"{self.server_url}/complete/{call_id}",
json=response_data,
timeout=10
)
response_obj.raise_for_status()
return True
except requests.RequestException as e:
print(f"Error completing call: {e}")
return False
def get_last_image_from_messages(self, messages: List[Dict[str, Any]]) -> Optional[Any]:
"""Extract the last image from the messages for display above conversation."""
last_image = None
for msg in reversed(messages): # Start from the last message
content = msg.get("content", "")
if isinstance(content, list):
for item in reversed(content): # Get the last image in the message
if item.get("type") == "image_url":
image_url = item.get("image_url", {}).get("url", "")
if image_url:
if image_url.startswith("data:image"):
# For base64 images, create a gr.Image component
try:
header, data = image_url.split(",", 1)
image_data = base64.b64decode(data)
image = Image.open(io.BytesIO(image_data))
return image
except Exception as e:
print(f"Error loading image: {e}")
continue
else:
# For URL images, return the URL
return image_url
return last_image
def refresh_pending_calls(self):
"""Refresh the list of pending calls."""
pending_calls = self.get_pending_calls()
if not pending_calls:
return (
gr.update(choices=["latest"], value="latest"), # dropdown
gr.update(value=None), # image (no image)
gr.update(value=[]), # chatbot (empty messages)
gr.update(interactive=False) # submit button
)
# Sort pending calls by created_at to get oldest first
sorted_calls = sorted(pending_calls, key=lambda x: x.get("created_at", ""))
# Create choices for dropdown
choices = [("latest", "latest")] # Add "latest" option first
for call in sorted_calls:
call_id = call["id"]
model = call.get("model", "unknown")
created_at = call.get("created_at", "")
# Format timestamp
try:
dt = datetime.fromisoformat(created_at.replace('Z', '+00:00'))
time_str = dt.strftime("%H:%M:%S")
except:
time_str = created_at
choice_label = f"{call_id[:8]}... ({model}) - {time_str}"
choices.append((choice_label, call_id))
# Default to "latest" which shows the oldest pending conversation
selected_call_id = "latest"
if selected_call_id == "latest" and sorted_calls:
# Use the oldest call (first in sorted list)
selected_call = sorted_calls[0]
conversation = self.format_messages_for_chatbot(selected_call.get("messages", []))
self.current_call_id = selected_call["id"]
# Get the last image from messages
self.last_image = self.get_last_image_from_messages(selected_call.get("messages", []))
else:
conversation = []
self.current_call_id = None
self.last_image = None
return (
gr.update(choices=choices, value="latest"),
gr.update(value=self.last_image),
gr.update(value=conversation),
gr.update(interactive=bool(choices))
)
def on_call_selected(self, selected_choice):
"""Handle when a call is selected from the dropdown."""
if not selected_choice:
return (
gr.update(value=None), # no image
gr.update(value=[]), # empty chatbot
gr.update(interactive=False)
)
pending_calls = self.get_pending_calls()
if not pending_calls:
return (
gr.update(value=None), # no image
gr.update(value=[]), # empty chatbot
gr.update(interactive=False)
)
# Handle "latest" option
if selected_choice == "latest":
# Sort calls by created_at to get oldest first
sorted_calls = sorted(pending_calls, key=lambda x: x.get("created_at", ""))
selected_call = sorted_calls[0] # Get the oldest call
call_id = selected_call["id"]
else:
# Extract call_id from the choice for specific calls
call_id = None
for call in pending_calls:
call_id_short = call["id"][:8]
if call_id_short in selected_choice:
call_id = call["id"]
break
if not call_id:
return (
gr.update(value=None), # no image
gr.update(value=[]), # empty chatbot
gr.update(interactive=False)
)
# Find the selected call
selected_call = next((c for c in pending_calls if c["id"] == call_id), None)
if not selected_call:
return (
gr.update(value=None), # no image
gr.update(value=[]), # empty chatbot
gr.update(interactive=False)
)
conversation = self.format_messages_for_chatbot(selected_call.get("messages", []))
self.current_call_id = call_id
# Get the last image from messages
self.last_image = self.get_last_image_from_messages(selected_call.get("messages", []))
return (
gr.update(value=self.last_image),
gr.update(value=conversation),
gr.update(interactive=True)
)
def submit_response(self, response_text: str):
"""Submit a text response to the current call."""
if not self.current_call_id:
return (
gr.update(value=response_text), # keep response text
gr.update(value="❌ No call selected") # status
)
if not response_text.strip():
return (
gr.update(value=response_text), # keep response text
gr.update(value="❌ Response cannot be empty") # status
)
success = self.complete_call_with_response(self.current_call_id, response_text)
if success:
status_msg = "✅ Response submitted successfully!"
return (
gr.update(value=""), # clear response text
gr.update(value=status_msg) # status
)
else:
return (
gr.update(value=response_text), # keep response text
gr.update(value="❌ Failed to submit response") # status
)
def submit_action(self, action_type: str, **kwargs) -> str:
"""Submit a computer action as a tool call."""
if not self.current_call_id:
return "❌ No call selected"
import uuid
# Create tool call structure
action_data = {"type": action_type, **kwargs}
tool_call = {
"id": f"call_{uuid.uuid4().hex[:24]}",
"type": "function",
"function": {
"name": "computer",
"arguments": json.dumps(action_data)
}
}
success = self.complete_call_with_tool_calls(self.current_call_id, [tool_call])
if success:
return f"{action_type.capitalize()} action submitted as tool call"
else:
return f"❌ Failed to submit {action_type} action"
def submit_click_action(self, x: int, y: int, action_type: str = "click", button: str = "left") -> str:
"""Submit a coordinate-based action."""
if action_type == "click":
return self.submit_action(action_type, x=x, y=y, button=button)
else:
return self.submit_action(action_type, x=x, y=y)
def submit_type_action(self, text: str) -> str:
"""Submit a type action."""
return self.submit_action("type", text=text)
def submit_hotkey_action(self, keys: str) -> str:
"""Submit a hotkey action."""
return self.submit_action("keypress", keys=keys)
def submit_description_click(self, description: str, action_type: str = "click", button: str = "left") -> str:
"""Submit a description-based action."""
if action_type == "click":
return self.submit_action(action_type, element_description=description, button=button)
else:
return self.submit_action(action_type, element_description=description)
def wait_for_pending_calls(self, max_seconds: float = 10.0, check_interval: float = 0.2):
"""Wait for pending calls to appear or until max_seconds elapsed.
This method loops and checks for pending calls at regular intervals,
returning as soon as a pending call is found or the maximum wait time is reached.
Args:
max_seconds: Maximum number of seconds to wait
check_interval: How often to check for pending calls (in seconds)
"""
import time
start_time = time.time()
while time.time() - start_time < max_seconds:
# Check if there are any pending calls
pending_calls = self.get_pending_calls()
if pending_calls:
# Found pending calls, return immediately
return self.refresh_pending_calls()
# Wait before checking again
time.sleep(check_interval)
# Max wait time reached, return current state
return self.refresh_pending_calls()
def create_ui():
"""Create the Gradio interface."""
ui_handler = HumanCompletionUI()
with gr.Blocks(title="Human-in-the-Loop Agent Tool") as demo:
gr.Markdown("# 🤖 Human-in-the-Loop Agent Tool")
gr.Markdown("Review AI conversation requests and provide human responses.")
with gr.Row():
with gr.Column(scale=2):
with gr.Group():
screenshot_image = gr.Image(
label="Screenshot",
interactive=False,
height=600
)
# Action type selection for image clicks
with gr.Row():
action_type_radio = gr.Radio(
label="Action Type",
choices=["click", "double_click", "move", "left_mouse_up", "left_mouse_down"],
value="click",
scale=2
)
action_button_radio = gr.Radio(
label="Button (for click only)",
choices=["left", "right", "wheel", "back", "forward"],
value="left",
visible=True,
scale=1
)
conversation_chatbot = gr.Chatbot(
label="Messages",
type="messages",
height=500,
show_copy_button=True
)
with gr.Column(scale=1):
with gr.Group():
call_dropdown = gr.Dropdown(
label="Select a pending call",
choices=["latest"],
interactive=True,
value="latest"
)
refresh_btn = gr.Button("🔄 Refresh", variant="secondary")
with gr.Group():
response_text = gr.Textbox(
label="Response",
lines=3,
placeholder="Enter your response here..."
)
submit_btn = gr.Button("📤 Submit Response", variant="primary", interactive=False)
# Action Accordions
with gr.Accordion("🖱️ Click Actions", open=False):
with gr.Group():
with gr.Row():
click_x = gr.Number(label="X", value=0, minimum=0)
click_y = gr.Number(label="Y", value=0, minimum=0)
with gr.Row():
click_action_type = gr.Dropdown(
label="Action Type",
choices=["click", "double_click", "move", "left_mouse_up", "left_mouse_down"],
value="click"
)
click_button = gr.Dropdown(
label="Button (for click only)",
choices=["left", "right", "wheel", "back", "forward"],
value="left"
)
click_submit_btn = gr.Button("Submit Action")
with gr.Accordion("📝 Type Action", open=False):
with gr.Group():
type_text = gr.Textbox(
label="Text to Type",
placeholder="Enter text to type..."
)
type_submit_btn = gr.Button("Submit Type")
with gr.Accordion("⌨️ Keypress Action", open=False):
with gr.Group():
keypress_text = gr.Textbox(
label="Keys",
placeholder="e.g., ctrl+c, alt+tab"
)
keypress_submit_btn = gr.Button("Submit Keypress")
with gr.Accordion("🎯 Description Action", open=False):
with gr.Group():
description_text = gr.Textbox(
label="Element Description",
placeholder="e.g., 'Privacy and security option in left sidebar'"
)
with gr.Row():
description_action_type = gr.Dropdown(
label="Action Type",
choices=["click", "double_click", "move", "left_mouse_up", "left_mouse_down"],
value="click"
)
description_button = gr.Radio(
label="Button (for click only)",
choices=["left", "right", "wheel", "back", "forward"],
value="left"
)
description_submit_btn = gr.Button("Submit Description Action")
status_display = gr.Textbox(
label="Status",
interactive=False,
value="Ready to receive calls..."
)
# Event handlers
refresh_btn.click(
fn=ui_handler.refresh_pending_calls,
outputs=[call_dropdown, screenshot_image, conversation_chatbot, submit_btn]
)
call_dropdown.change(
fn=ui_handler.on_call_selected,
inputs=[call_dropdown],
outputs=[screenshot_image, conversation_chatbot, submit_btn]
)
def handle_image_click(evt: gr.SelectData):
if evt.index is not None:
x, y = evt.index
action_type = action_type_radio.value or "click"
button = action_button_radio.value or "left"
result = ui_handler.submit_click_action(x, y, action_type, button)
ui_handler.wait_for_pending_calls()
return result
return "No coordinates selected"
screenshot_image.select(
fn=handle_image_click,
outputs=[status_display]
).then(
fn=ui_handler.wait_for_pending_calls,
outputs=[call_dropdown, screenshot_image, conversation_chatbot, submit_btn]
)
# Response submission
submit_btn.click(
fn=ui_handler.submit_response,
inputs=[response_text],
outputs=[response_text, status_display]
).then(
fn=ui_handler.refresh_pending_calls,
outputs=[call_dropdown, screenshot_image, conversation_chatbot, submit_btn]
)
# Toggle button radio visibility based on action type
def toggle_button_visibility(action_type):
return gr.update(visible=(action_type == "click"))
action_type_radio.change(
fn=toggle_button_visibility,
inputs=[action_type_radio],
outputs=[action_button_radio]
)
# Action accordion handlers
click_submit_btn.click(
fn=ui_handler.submit_click_action,
inputs=[click_x, click_y, click_action_type, click_button],
outputs=[status_display]
).then(
fn=ui_handler.wait_for_pending_calls,
outputs=[call_dropdown, screenshot_image, conversation_chatbot, submit_btn]
)
type_submit_btn.click(
fn=ui_handler.submit_type_action,
inputs=[type_text],
outputs=[status_display]
).then(
fn=ui_handler.wait_for_pending_calls,
outputs=[call_dropdown, screenshot_image, conversation_chatbot, submit_btn]
)
keypress_submit_btn.click(
fn=ui_handler.submit_hotkey_action,
inputs=[keypress_text],
outputs=[status_display]
).then(
fn=ui_handler.wait_for_pending_calls,
outputs=[call_dropdown, screenshot_image, conversation_chatbot, submit_btn]
)
def handle_description_submit(description, action_type, button):
if description:
result = ui_handler.submit_description_click(description, action_type, button)
ui_handler.wait_for_pending_calls()
return result
return "Please enter a description"
description_submit_btn.click(
fn=handle_description_submit,
inputs=[description_text, description_action_type, description_button],
outputs=[status_display]
).then(
fn=ui_handler.wait_for_pending_calls,
outputs=[call_dropdown, screenshot_image, conversation_chatbot, submit_btn]
)
# Load initial data
demo.load(
fn=ui_handler.refresh_pending_calls,
outputs=[call_dropdown, screenshot_image, conversation_chatbot, submit_btn]
)
return demo
if __name__ == "__main__":
demo = create_ui()
demo.queue()
demo.launch(server_name="0.0.0.0", server_port=7860)

View File

@@ -0,0 +1,77 @@
"""HUD integration for ComputerAgent."""
import logging
from typing import Any, Optional, Dict
from hud import run_job as hud_run_job
from .agent import ComputerAgent
from .adapter import ComputerAgentAdapter
from .computer_handler import HUDComputerHandler
async def run_job(
model: str,
task_or_taskset: Any,
job_name: str,
# Job kwargs
auto_reply_question: bool = False,
adapter_cls: Any = None,
adapter_kwargs: Optional[Dict[str, Any]] = None,
max_steps_per_task: int = 20,
run_parallel: bool = True,
job_metadata: Optional[Dict[str, Any]] = None,
show_progress: bool = True,
max_concurrent_env_creations: Optional[int] = 30, # Limits gym.make calls
max_concurrent_agent_predictions: Optional[int] = None, # No limit on LLM calls
max_concurrent_tasks: Optional[int] = 30, # Limits overall task concurrency
**agent_kwargs: Any
) -> Any:
"""
Run a job using ComputerAgent with the specified model.
Args:
model: Model string for ComputerAgent (e.g., "anthropic/claude-3-5-sonnet-20241022")
task_or_taskset: Task or TaskSet to run
job_name: Name for the job
auto_reply_question: Whether to auto-reply to questions
adapter_cls: Custom adapter class (defaults to ComputerAgentAdapter)
adapter_kwargs: Additional kwargs for the adapter
max_steps_per_task: Maximum steps per task
run_parallel: Whether to run tasks in parallel
job_metadata: Additional metadata for the job
show_progress: Whether to show progress
max_concurrent_env_creations: Max concurrent environment creations
max_concurrent_agent_predictions: Max concurrent agent predictions
max_concurrent_tasks: Max concurrent tasks
**agent_kwargs: Additional kwargs to pass to ComputerAgent
Returns:
Job instance from HUD
"""
# combine verbose and verbosity kwargs
if "verbose" in agent_kwargs:
agent_kwargs["verbosity"] = logging.INFO
del agent_kwargs["verbose"]
verbose = True if agent_kwargs.get("verbosity", logging.WARNING) > logging.INFO else False
# run job
return await hud_run_job(
agent_cls=ComputerAgent,
agent_kwargs={"model": model, **agent_kwargs},
task_or_taskset=task_or_taskset,
job_name=job_name,
auto_reply_question=auto_reply_question,
adapter_cls=adapter_cls,
adapter_kwargs=adapter_kwargs,
max_steps_per_task=max_steps_per_task,
run_parallel=run_parallel,
job_metadata=job_metadata,
show_progress=show_progress,
verbose=verbose,
max_concurrent_env_creations=max_concurrent_env_creations,
max_concurrent_agent_predictions=max_concurrent_agent_predictions,
max_concurrent_tasks=max_concurrent_tasks
)
__all__ = ["ComputerAgent", "ComputerAgentAdapter", "HUDComputerHandler", "run_job"]

View File

@@ -0,0 +1,121 @@
"""HUD Adapter for ComputerAgent integration."""
from __future__ import annotations
from typing import Any, ClassVar
from hud.adapters.common import CLA, Adapter
from hud.adapters.common.types import (
CLAButton,
CLAKey,
ClickAction,
CustomAction,
DragAction,
MoveAction,
Point,
PressAction,
ResponseAction,
ScreenshotFetch,
ScrollAction,
TypeAction,
WaitAction,
)
class ComputerAgentAdapter(Adapter):
"""Adapter for ComputerAgent to work with HUD."""
KEY_MAP: ClassVar[dict[str, CLAKey]] = {
"return": "enter",
"arrowup": "up",
"arrowdown": "down",
"arrowleft": "left",
"arrowright": "right",
"cmd": "ctrl",
"super": "win",
"meta": "win",
}
BUTTON_MAP: ClassVar[dict[str, CLAButton]] = {
"wheel": "middle",
"middle": "middle",
}
def __init__(self) -> None:
super().__init__()
# ComputerAgent default dimensions (can be overridden)
self.agent_width = 1024
self.agent_height = 768
def _map_key(self, key: str) -> CLAKey:
"""Map a key to its standardized form."""
return self.KEY_MAP.get(key.lower(), key.lower()) # type: ignore
def convert(self, data: Any) -> CLA:
"""Convert a ComputerAgent action to a HUD action."""
try:
action_type = data.get("type")
if action_type == "click":
x, y = data.get("x", 0), data.get("y", 0)
button = data.get("button", "left")
button = self.BUTTON_MAP.get(button, button)
if button is None:
button = "left"
converted_action = ClickAction(point=Point(x=x, y=y), button=button)
elif action_type == "double_click":
x, y = data.get("x", 0), data.get("y", 0)
converted_action = ClickAction(point=Point(x=x, y=y), button="left", pattern=[100])
elif action_type == "scroll":
x, y = int(data.get("x", 0)), int(data.get("y", 0))
scroll_x = int(data.get("scroll_x", 0))
scroll_y = int(data.get("scroll_y", 0))
converted_action = ScrollAction(
point=Point(x=x, y=y), scroll=Point(x=scroll_x, y=scroll_y)
)
elif action_type == "type":
text = data.get("text", "")
converted_action = TypeAction(text=text, enter_after=False)
elif action_type == "wait":
ms = data.get("ms", 1000)
converted_action = WaitAction(time=ms)
elif action_type == "move":
x, y = data.get("x", 0), data.get("y", 0)
converted_action = MoveAction(point=Point(x=x, y=y))
elif action_type == "keypress":
keys = data.get("keys", [])
if isinstance(keys, str):
keys = [keys]
converted_action = PressAction(keys=[self._map_key(k) for k in keys])
elif action_type == "drag":
path = data.get("path", [])
points = [Point(x=p.get("x", 0), y=p.get("y", 0)) for p in path]
converted_action = DragAction(path=points)
elif action_type == "screenshot":
converted_action = ScreenshotFetch()
elif action_type == "response":
converted_action = ResponseAction(text=data.get("text", ""))
elif action_type == "custom":
converted_action = CustomAction(action=data.get("action", ""))
else:
raise ValueError(f"Unsupported action type: {action_type}")
# Add reasoning and logs if available
converted_action.reasoning = data.get("reasoning", "")
converted_action.logs = data.get("logs", "")
return converted_action
except Exception as e:
raise ValueError(f"Invalid action: {data}. Error: {e!s}") from e

View File

@@ -0,0 +1,373 @@
"""HUD ComputerAgent wrapper for OSWorld benchmarking."""
import logging
from typing import Any, Literal, Optional, Union, List, Dict
import asyncio
from agent import ComputerAgent as BaseComputerAgent
from agent.responses import make_failed_tool_call_items
from hud.adapters import Adapter
from hud.agent.base import Agent
from hud.utils.common import Observation
from hud.adapters.common.types import LogType
from hud.types import Gym
from .adapter import ComputerAgentAdapter
from .computer_handler import HUDComputerHandler
logger = logging.getLogger(__name__)
BASE_SYSTEM_PROMPT = """
You are an autonomous computer-using agent. Follow these guidelines:
1. Be decisive and complete tasks without asking for confirmation unless absolutely necessary.
2. Use the computer tools to complete the task and do not stop until the task is complete.
3. Do NOT ask questions like "Should I proceed?" or "Would you like me to continue?" - just proceed with the task.
4. When you find what you're looking for (e.g., a file to upload), proceed with the action directly.
5. Only stop when the task is fully complete or if you encounter an error that prevents completion.
6. Trust that the user wants you to complete the entire task they've requested.
7. You must say "Task completed" when the task is complete.
Remember: You have been given permission to complete the requested task autonomously.
""".strip()
class ComputerAgent(Agent[BaseComputerAgent, dict[str, Any]]):
"""
A ComputerAgent wrapper for HUD integration.
This agent wraps the base ComputerAgent to work with HUD environments,
providing the same interface as OperatorAgent but using ComputerAgent internally.
"""
transfer_gyms: dict[Gym, Gym] = {"qa": "hud-browser"}
def __init__(
self,
model: str = "anthropic/claude-3-5-sonnet-20241022",
environment: Literal["windows", "mac", "linux", "browser"] = "linux",
adapter: Optional[Adapter] = None,
name: Optional[str] = None,
**kwargs: Any,
):
"""
Initialize the ComputerAgent for HUD.
Args:
model: The model string for ComputerAgent (e.g., "anthropic/claude-3-5-sonnet-20241022")
environment: The environment type (windows, mac, linux, browser)
adapter: The adapter to use for preprocessing and postprocessing
name: The name of the agent
**kwargs: Additional arguments passed to ComputerAgent
"""
# Create adapter if not provided
adapter = adapter or ComputerAgentAdapter()
if name is None:
name = f"computeragent-{model.split('/')[-1]}"
# Initialize the base Agent class without client (we'll create it later)
super().__init__(client=None, adapter=adapter, name=name)
self.model = model
self.environment = environment
self.kwargs = kwargs
# Default dimensions
self.width = 1024
self.height = 768
# Update dimensions if adapter is provided
if self.adapter:
self.width = self.adapter.agent_width
self.height = self.adapter.agent_height
# Create HUD computer handler
self.hud_computer = HUDComputerHandler(
environment=environment,
dimensions=(self.width, self.height)
)
# Handle trajectory_dir by adding TrajectorySaverCallback
trajectory_dir = kwargs.pop("trajectory_dir", None)
callbacks = kwargs.get("callbacks", [])
if trajectory_dir:
from agent.callbacks.trajectory_saver import TrajectorySaverCallback
trajectory_callback = TrajectorySaverCallback(trajectory_dir, reset_on_run=False)
callbacks = callbacks + [trajectory_callback]
kwargs["callbacks"] = callbacks
# Initialize ComputerAgent with HUD computer handler
self.computer_agent = BaseComputerAgent(
model=model,
tools=[self.hud_computer],
**kwargs
)
# Set the client to the computer_agent for compatibility
self.client = self.computer_agent
# State tracking
self.conversation_history: List[Dict[str, Any]] = []
self.initial_prompt: Optional[str] = None
# System prompt for computer use tasks
self.base_system_prompt = BASE_SYSTEM_PROMPT
async def fetch_response(self, observation: Observation) -> tuple[list[dict[str, Any]], bool]:
"""
Fetch a response from ComputerAgent based on the observation.
Args:
observation: The preprocessed observation, attributes:
screenshot: Base64 encoded PNG string of the screen
text: Text observation, if available
Returns:
tuple[list[dict[str, Any]], bool, list[LogType] | None]: A tuple containing the list of raw actions,
boolean indicating if the agent believes the task is complete.
"""
try:
# Update the computer handler with the current screenshot
if observation.screenshot:
self.hud_computer.update_screenshot(observation.screenshot)
# Set up action callback to capture actions
captured_actions = []
action_done = False
async def action_callback(action: Dict[str, Any]) -> None:
"""Callback to capture actions from ComputerAgent."""
nonlocal captured_actions, action_done
captured_actions.append(action)
# Set the action callback
self.hud_computer.set_action_callback(action_callback)
# Prepare the message for ComputerAgent
if not self.conversation_history:
# First interaction - use the observation text as initial prompt
if observation.text:
self.initial_prompt = observation.text
message = f"{self.base_system_prompt}\n\nTask: {observation.text}"
else:
message = f"{self.base_system_prompt}\n\nPlease analyze the current screen and determine what action to take."
input_content = [
{"type": "input_text", "text": message}
]
# Add screenshot if present
if observation.screenshot:
input_content.append(
{
"type": "input_image",
"image_url": f"data:image/png;base64,{observation.screenshot}",
}
)
self.conversation_history.append({"role": "user", "content": input_content})
else:
# Subsequent interactions - check if last action was computer_call
# If so, add computer_call_output with screenshot instead of user message
last_computer_calls = []
for msg in reversed(self.conversation_history):
if msg.get("type") == "computer_call":
call_id = msg.get("call_id")
if call_id:
# Check if this call_id already has a computer_call_output
has_output = any(
m.get("type") == "computer_call_output" and m.get("call_id") == call_id
for m in self.conversation_history
)
if not has_output:
last_computer_calls.append(call_id)
if last_computer_calls:
if not observation.screenshot:
print("No screenshot found, taking screenshot")
screenshot_b64 = await self.hud_computer.screenshot()
# Add computer_call_output for each unresponded computer_call
for call_id in reversed(last_computer_calls): # Maintain order
self.conversation_history.append({
"type": "computer_call_output",
"call_id": call_id,
"output": {
"type": "input_image",
"image_url": f"data:image/png;base64,{screenshot_b64}"
}
})
else:
# No computer_call found, add regular user message
message = "Continue with the task based on the current screen state."
input_content = [
{"type": "input_text", "text": message}
]
# Add screenshot if present
if observation.screenshot:
input_content.append(
{
"type": "input_image",
"image_url": f"data:image/png;base64,{observation.screenshot}",
}
)
self.conversation_history.append({"role": "user", "content": input_content})
# If the last message is a reasoning message, change it to output_text
if (self.conversation_history and
self.conversation_history[-1].get("type") == "reasoning" and
self.conversation_history[-1].get("summary")):
reasoning_msg = self.conversation_history[-1]
summary_texts = []
# Extract all summary_text entries
for summary_item in reasoning_msg["summary"]:
if summary_item.get("type") == "summary_text":
summary_texts.append(summary_item.get("text", ""))
# Convert to message format with output_text
if summary_texts:
converted_message = {
"type": "message",
"role": "assistant",
"content": [
{
"text": " ".join(summary_texts),
"type": "output_text"
}
]
}
# Replace the reasoning message with the converted message
self.conversation_history[-1] = converted_message
# Run ComputerAgent
try:
new_items = []
# ComputerAgent.run returns an async generator
try:
async for result in self.computer_agent.run(self.conversation_history, stream=False):
# if the result has computer_call_output, immediately exit
if result.get("output", []) and result.get("output", [])[-1].get("type") == "computer_call_output":
break
# otherwise add agent output to conversation history
new_items += result["output"]
except Exception as e:
# if the last message is reasoning, change it to output_text
if new_items and new_items[-1].get("type") == "reasoning":
new_items[-1] = {
"type": "message",
"role": "assistant",
"content": [
{
"text": new_items[-1].get("summary", [{}])[0].get("text", ""),
"type": "output_text"
}
]
}
# Check if there are any computer_call items in new_items
computer_calls = [item for item in new_items if item.get("type") == "computer_call"]
if computer_calls:
# Remove computer_call items from new_items
new_items = [item for item in new_items if item.get("type") != "computer_call"]
# Add failed tool call items for each computer call
for computer_call in computer_calls:
tool_input = computer_call.get("action", {})
call_id = computer_call.get("call_id")
new_items.extend(make_failed_tool_call_items(
tool_name="computer",
tool_kwargs=tool_input,
error_message=repr(e),
call_id=call_id
))
else:
# add error message to conversation history (fallback for non-computer-call errors)
new_items.append({
"type": "user",
"content": [
{
"type": "input_text",
"text": f"Error during previous attempted action: {repr(e)}"
}
]
})
# Check if we captured any actions
if captured_actions:
# Extract reasoning from the conversation history
reasoning = ""
# Look for the latest reasoning message
for msg in reversed(new_items):
if msg.get("type") == "reasoning" and msg.get("summary"):
reasoning = " ".join([s.get("text", "") for s in msg["summary"] if s.get("type") == "summary_text"])
break
elif msg.get("type") == "message" and msg.get("role") == "assistant":
content = msg.get("content", [])
if isinstance(content, list):
reasoning = " ".join([c.get("text", "") for c in content if c.get("type") == "output_text"])
break
# update conversation history
self.conversation_history += new_items
# Add reasoning and logs to each action
for action in captured_actions:
action["reasoning"] = reasoning
action["logs"] = {"conversation_length": len(self.conversation_history)}
return captured_actions, False
# Check if the last message is "Task completed"
response_text = ""
for msg in reversed(new_items):
if msg.get("type") == "message" and msg.get("role") == "assistant":
content = msg.get("content", [])
for c in content:
if c.get("type") == "output_text":
response_text = c.get("text", response_text)
break
break
done = "task completed" in response_text.lower()
# update conversation history
self.conversation_history += new_items
response_action = {
"type": "response",
"text": response_text,
"reasoning": response_text,
"logs": {"conversation_length": len(self.conversation_history)}
}
# Check if this indicates task completion or failure
if "task is infeasible" in response_text.lower():
response_action = {"type": "custom", "action": "FAIL"}
done = True
return [response_action], done
except Exception as e:
logger.error(f"Error running ComputerAgent: {e}")
# Return an error response
error_action = {
"type": "response",
"text": f"Error occurred: {str(e)}",
"reasoning": f"ComputerAgent encountered an error: {str(e)}",
"logs": {"error": str(e)}
}
return [error_action], True
except Exception as e:
logger.error(f"Error in fetch_response: {e}")
error_action = {
"type": "response",
"text": f"Error in agent processing: {str(e)}",
"reasoning": f"Agent processing error: {str(e)}",
"logs": {"error": str(e)}
}
return [error_action], True

View File

@@ -0,0 +1,187 @@
"""HUD Computer Handler for ComputerAgent integration."""
import base64
from io import BytesIO
from typing import Literal, Optional, Any, Dict, Callable
from PIL import Image
from agent.computers import AsyncComputerHandler
class HUDComputerHandler(AsyncComputerHandler):
"""Computer handler that interfaces with HUD environment."""
def __init__(
self,
environment: Literal["windows", "mac", "linux", "browser"] = "linux",
dimensions: tuple[int, int] = (1024, 768),
screenshot_callback: Optional[Callable] = None,
action_callback: Optional[Callable] = None,
):
"""
Initialize HUD computer handler.
Args:
environment: The environment type for HUD
dimensions: Screen dimensions as (width, height)
screenshot_callback: Optional callback to get screenshots from HUD environment
action_callback: Optional callback to execute actions in HUD environment
"""
super().__init__()
self._environment = environment
self._dimensions = dimensions
self._screenshot_callback = screenshot_callback
self._action_callback = action_callback
# Store the last screenshot for reuse
self._last_screenshot: Optional[str] = None
def set_screenshot_callback(self, callback: Callable) -> None:
"""Set the screenshot callback."""
self._screenshot_callback = callback
def set_action_callback(self, callback: Callable) -> None:
"""Set the action callback."""
self._action_callback = callback
def update_screenshot(self, screenshot: str) -> None:
"""Update the stored screenshot (base64 string)."""
self._last_screenshot = screenshot
async def get_environment(self) -> Literal["windows", "mac", "linux", "browser"]:
"""Get the current environment type."""
return self._environment # type: ignore
async def get_dimensions(self) -> tuple[int, int]:
"""Get screen dimensions as (width, height)."""
return self._dimensions
async def screenshot(self) -> str:
"""Take a screenshot and return as base64 string."""
if self._screenshot_callback:
screenshot = await self._screenshot_callback()
if isinstance(screenshot, str):
self._last_screenshot = screenshot
return screenshot
elif isinstance(screenshot, Image.Image):
# Convert PIL Image to base64
buffer = BytesIO()
screenshot.save(buffer, format="PNG")
screenshot_b64 = base64.b64encode(buffer.getvalue()).decode()
self._last_screenshot = screenshot_b64
return screenshot_b64
elif isinstance(screenshot, bytes):
screenshot_b64 = base64.b64encode(screenshot).decode()
self._last_screenshot = screenshot_b64
return screenshot_b64
# Return last screenshot if available, otherwise create a blank one
if self._last_screenshot:
return self._last_screenshot
# Create a blank screenshot as fallback
blank_image = Image.new('RGB', self._dimensions, color='white')
buffer = BytesIO()
blank_image.save(buffer, format="PNG")
screenshot_b64 = base64.b64encode(buffer.getvalue()).decode()
self._last_screenshot = screenshot_b64
return screenshot_b64
async def click(self, x: int, y: int, button: str = "left") -> None:
"""Click at coordinates with specified button."""
if self._action_callback:
await self._action_callback({
"type": "click",
"x": x,
"y": y,
"button": button
})
async def double_click(self, x: int, y: int) -> None:
"""Double click at coordinates."""
if self._action_callback:
await self._action_callback({
"type": "double_click",
"x": x,
"y": y
})
async def scroll(self, x: int, y: int, scroll_x: int, scroll_y: int) -> None:
"""Scroll at coordinates with specified scroll amounts."""
if self._action_callback:
await self._action_callback({
"type": "scroll",
"x": x,
"y": y,
"scroll_x": scroll_x,
"scroll_y": scroll_y
})
async def type(self, text: str) -> None:
"""Type text."""
if self._action_callback:
await self._action_callback({
"type": "type",
"text": text
})
async def wait(self, ms: int = 1000) -> None:
"""Wait for specified milliseconds."""
if self._action_callback:
await self._action_callback({
"type": "wait",
"ms": ms
})
async def move(self, x: int, y: int) -> None:
"""Move cursor to coordinates."""
if self._action_callback:
await self._action_callback({
"type": "move",
"x": x,
"y": y
})
async def keypress(self, keys: list[str] | str) -> None:
"""Press key combination."""
if isinstance(keys, str):
keys = [keys]
if self._action_callback:
await self._action_callback({
"type": "keypress",
"keys": keys
})
async def drag(self, path: list[dict[str, int]]) -> None:
"""Drag along a path of points."""
if self._action_callback:
await self._action_callback({
"type": "drag",
"path": path
})
async def left_mouse_down(self, x: Optional[int] = None, y: Optional[int] = None) -> None:
"""Left mouse down at coordinates."""
if self._action_callback:
await self._action_callback({
"type": "left_mouse_down",
"x": x,
"y": y
})
async def left_mouse_up(self, x: Optional[int] = None, y: Optional[int] = None) -> None:
"""Left mouse up at coordinates."""
if self._action_callback:
await self._action_callback({
"type": "left_mouse_up",
"x": x,
"y": y
})
async def get_current_url(self) -> str:
"""Get the current URL."""
if self._action_callback:
return await self._action_callback({
"type": "get_current_url"
})
return ""

View File

@@ -9,7 +9,7 @@ from litellm import ResponseInputParam, ResponsesAPIResponse, ToolParam
from collections.abc import Iterable
# Agent input types
Messages = str | ResponseInputParam
Messages = str | ResponseInputParam | List[Dict[str, Any]]
Tools = Optional[Iterable[ToolParam]]
# Agent output types
@@ -27,55 +27,3 @@ class AgentConfigInfo(BaseModel):
def matches_model(self, model: str) -> bool:
"""Check if this agent config matches the given model"""
return bool(re.match(self.models_regex, model))
# Computer tool interface
class Computer(Protocol):
"""Protocol defining the interface for computer interactions."""
async def get_environment(self) -> Literal["windows", "mac", "linux", "browser"]:
"""Get the current environment type."""
...
async def get_dimensions(self) -> tuple[int, int]:
"""Get screen dimensions as (width, height)."""
...
async def screenshot(self) -> str:
"""Take a screenshot and return as base64 string."""
...
async def click(self, x: int, y: int, button: str = "left") -> None:
"""Click at coordinates with specified button."""
...
async def double_click(self, x: int, y: int) -> None:
"""Double click at coordinates."""
...
async def scroll(self, x: int, y: int, scroll_x: int, scroll_y: int) -> None:
"""Scroll at coordinates with specified scroll amounts."""
...
async def type(self, text: str) -> None:
"""Type text."""
...
async def wait(self, ms: int = 1000) -> None:
"""Wait for specified milliseconds."""
...
async def move(self, x: int, y: int) -> None:
"""Move cursor to coordinates."""
...
async def keypress(self, keys: List[str]) -> None:
"""Press key combination."""
...
async def drag(self, path: List[Dict[str, int]]) -> None:
"""Drag along specified path."""
...
async def get_current_url(self) -> str:
"""Get current URL (for browser environments)."""
...

View File

@@ -42,6 +42,11 @@ uitars-hf = [
"torch",
"transformers>=4.54.0"
]
glm45v-hf = [
"accelerate",
"torch",
"transformers-v4.55.0-GLM-4.5V-preview"
]
ui = [
"gradio>=5.23.3",
"python-dotenv>=1.0.1",
@@ -49,10 +54,8 @@ ui = [
cli = [
"yaspin>=3.1.0",
]
glm45v-hf = [
"accelerate",
"torch",
"transformers-v4.55.0-GLM-4.5V-preview"
hud = [
"hud-python==0.2.10",
]
all = [
# omni requirements
@@ -68,6 +71,8 @@ all = [
"python-dotenv>=1.0.1",
# cli requirements
"yaspin>=3.1.0",
# hud requirements
"hud-python==0.2.10",
]
[tool.uv]

110050
notebooks/eval_osworld.ipynb Normal file

File diff suppressed because it is too large Load Diff