Added agent proxy

This commit is contained in:
Dillon DuPont
2025-08-14 12:58:51 -04:00
parent 8967835e42
commit 66c0d7f1f1
7 changed files with 963 additions and 0 deletions
+237
View File
@@ -0,0 +1,237 @@
# ComputerAgent Proxy
A proxy server that exposes ComputerAgent functionality over HTTP and P2P (WebRTC) connections. This allows remote clients to interact with ComputerAgent instances through a simple REST-like API.
## Features
- **HTTP Server**: Standard HTTP REST API using Starlette
- **P2P Server**: WebRTC-based peer-to-peer connections using peerjs-python
- **No Pydantic**: Uses plain dictionaries for request/response handling
- **OpenAI-compatible API**: Similar request format to OpenAI's API
- **Multi-modal Support**: Handles text and image inputs
- **Configurable**: Supports custom agent and computer configurations
## Installation
The proxy requires additional dependencies:
```bash
# For HTTP server
pip install starlette uvicorn
# For P2P server (optional)
pip install peerjs-python aiortc
```
## Usage
### Starting the Server
```bash
# HTTP server only (default)
python -m agent.proxy.cli
# Custom host/port
python -m agent.proxy.cli --host 0.0.0.0 --port 8080
# P2P server only
python -m agent.proxy.cli --mode p2p --peer-id my-agent-proxy
# Both HTTP and P2P
python -m agent.proxy.cli --mode both --peer-id my-agent-proxy
```
### API Endpoints
#### POST /responses
Process a request using ComputerAgent and return the first result.
**Request Format:**
```json
{
"model": "anthropic/claude-3-5-sonnet-20241022",
"input": "Your instruction here",
"agent_kwargs": {
"save_trajectory": true,
"verbosity": 20
},
"computer_kwargs": {
"os_type": "linux",
"provider_type": "cloud"
}
}
```
**Multi-modal Request:**
```json
{
"model": "anthropic/claude-3-5-sonnet-20241022",
"input": [
{
"role": "user",
"content": [
{"type": "input_text", "text": "what is in this image?"},
{
"type": "input_image",
"image_url": "https://example.com/image.jpg"
}
]
}
]
}
```
**Response Format:**
```json
{
"success": true,
"result": {
// Agent response data
},
"model": "anthropic/claude-3-5-sonnet-20241022"
}
```
#### GET /health
Health check endpoint.
### cURL Examples
```bash
# Simple text request
curl http://localhost:8000/responses \
-H "Content-Type: application/json" \
-d '{
"model": "anthropic/claude-3-5-sonnet-20241022",
"input": "Tell me a three sentence bedtime story about a unicorn."
}'
# Multi-modal request
curl http://localhost:8000/responses \
-H "Content-Type: application/json" \
-d '{
"model": "anthropic/claude-3-5-sonnet-20241022",
"input": [
{
"role": "user",
"content": [
{"type": "input_text", "text": "what is in this image?"},
{
"type": "input_image",
"image_url": "https://upload.wikimedia.org/wikipedia/commons/thumb/d/dd/Gfp-wisconsin-madison-the-nature-boardwalk.jpg/2560px-Gfp-wisconsin-madison-the-nature-boardwalk.jpg"
}
]
}
]
}'
```
### P2P Usage
The P2P server allows WebRTC connections for direct peer-to-peer communication:
```python
# Connect to P2P proxy
from peerjs import Peer, PeerOptions
import json
peer = Peer(id="client", peer_options=PeerOptions(host="localhost", port=9000))
await peer.start()
connection = peer.connect("computer-agent-proxy")
# Send request
request = {
"model": "anthropic/claude-3-5-sonnet-20241022",
"input": "Hello from P2P!"
}
await connection.send(json.dumps(request))
```
## Configuration
### Environment Variables
- `CUA_CONTAINER_NAME`: Default container name for cloud provider
- `CUA_API_KEY`: Default API key for cloud provider
### Request Parameters
- `model`: Model string (required) - e.g., "anthropic/claude-3-5-sonnet-20241022"
- `input`: String or message array (required)
- `agent_kwargs`: Optional agent configuration
- `computer_kwargs`: Optional computer configuration
### Agent Configuration (`agent_kwargs`)
Common options:
- `save_trajectory`: Boolean - Save conversation trajectory
- `verbosity`: Integer - Logging level (10=DEBUG, 20=INFO, etc.)
- `max_trajectory_budget`: Float - Budget limit for trajectory
### Computer Configuration (`computer_kwargs`)
Common options:
- `os_type`: String - "linux", "windows", "macos"
- `provider_type`: String - "cloud", "local", "docker"
- `name`: String - Instance name
- `api_key`: String - Provider API key
## Architecture
```
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ HTTP Client │ │ P2P Client │ │ Direct Usage │
└─────────┬───────┘ └─────────┬───────┘ └─────────┬───────┘
│ │ │
▼ ▼ ▼
┌─────────────────────────────────────────────────────────────────┐
│ ProxyServer │
├─────────────────────────────────────────────────────────────────┤
│ ResponsesHandler │
├─────────────────────────────────────────────────────────────────┤
│ ComputerAgent + Computer │
└─────────────────────────────────────────────────────────────────┘
```
## Examples
See `examples.py` for complete usage examples:
```bash
# Run HTTP tests
python agent/proxy/examples.py
# Show curl examples
python agent/proxy/examples.py curl
# Test P2P (requires peerjs-python)
python agent/proxy/examples.py p2p
```
## Error Handling
The proxy returns structured error responses:
```json
{
"success": false,
"error": "Error description",
"model": "model-used"
}
```
Common errors:
- Missing required parameters (`model`, `input`)
- Invalid JSON in request body
- Agent execution errors
- Computer setup failures
## Limitations
- Returns only the first result from agent.run() (as requested)
- P2P requires peerjs-python and signaling server
- Computer instances are created per request (not pooled)
- No authentication/authorization built-in
@@ -0,0 +1,8 @@
"""
Proxy module for exposing ComputerAgent over HTTP and P2P connections.
"""
from .server import ProxyServer
from .handlers import ResponsesHandler
__all__ = ['ProxyServer', 'ResponsesHandler']
+21
View File
@@ -0,0 +1,21 @@
"""
CLI entry point for the proxy server.
"""
import asyncio
import sys
from .server import main
def cli():
"""CLI entry point."""
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\nShutting down...")
sys.exit(0)
except Exception as e:
print(f"Error: {e}")
sys.exit(1)
if __name__ == "__main__":
cli()
+177
View File
@@ -0,0 +1,177 @@
"""
Example usage of the proxy server and client requests.
"""
import asyncio
import json
import aiohttp
from typing import Dict, Any
async def test_http_endpoint():
"""Test the HTTP /responses endpoint."""
# Example 1: Simple text request
simple_request = {
"model": "anthropic/claude-3-5-sonnet-20241022",
"input": "Tell me a three sentence bedtime story about a unicorn."
}
# Example 2: Multi-modal request with image
multimodal_request = {
"model": "anthropic/claude-3-5-sonnet-20241022",
"input": [
{
"role": "user",
"content": [
{"type": "input_text", "text": "what is in this image?"},
{
"type": "input_image",
"image_url": "https://upload.wikimedia.org/wikipedia/commons/thumb/d/dd/Gfp-wisconsin-madison-the-nature-boardwalk.jpg/2560px-Gfp-wisconsin-madison-the-nature-boardwalk.jpg"
}
]
}
]
}
# Example 3: Request with custom agent and computer kwargs
custom_request = {
"model": "anthropic/claude-3-5-sonnet-20241022",
"input": "Take a screenshot and tell me what you see",
"agent_kwargs": {
"save_trajectory": True,
"verbosity": 20 # INFO level
},
"computer_kwargs": {
"os_type": "linux",
"provider_type": "cloud"
}
}
# Test requests
base_url = "http://localhost:8000"
async with aiohttp.ClientSession() as session:
for i, request_data in enumerate([simple_request, multimodal_request, custom_request], 1):
print(f"\n--- Test {i} ---")
print(f"Request: {json.dumps(request_data, indent=2)}")
try:
async with session.post(
f"{base_url}/responses",
json=request_data,
headers={"Content-Type": "application/json"}
) as response:
result = await response.json()
print(f"Status: {response.status}")
print(f"Response: {json.dumps(result, indent=2)}")
except Exception as e:
print(f"Error: {e}")
def curl_examples():
"""Print curl command examples."""
print("=== CURL Examples ===\n")
print("1. Simple text request:")
print("""curl http://localhost:8000/responses \\
-H "Content-Type: application/json" \\
-d '{
"model": "anthropic/claude-3-5-sonnet-20241022",
"input": "Tell me a three sentence bedtime story about a unicorn."
}'""")
print("\n2. Multi-modal request with image:")
print("""curl http://localhost:8000/responses \\
-H "Content-Type: application/json" \\
-d '{
"model": "anthropic/claude-3-5-sonnet-20241022",
"input": [
{
"role": "user",
"content": [
{"type": "input_text", "text": "what is in this image?"},
{
"type": "input_image",
"image_url": "https://upload.wikimedia.org/wikipedia/commons/thumb/d/dd/Gfp-wisconsin-madison-the-nature-boardwalk.jpg/2560px-Gfp-wisconsin-madison-the-nature-boardwalk.jpg"
}
]
}
]
}'""")
print("\n3. Request with custom configuration:")
print("""curl http://localhost:8000/responses \\
-H "Content-Type: application/json" \\
-d '{
"model": "anthropic/claude-3-5-sonnet-20241022",
"input": "Take a screenshot and tell me what you see",
"agent_kwargs": {
"save_trajectory": true,
"verbosity": 20
},
"computer_kwargs": {
"os_type": "linux",
"provider_type": "cloud"
}
}'""")
async def test_p2p_client():
"""Example P2P client using peerjs-python."""
try:
from peerjs import Peer, PeerOptions, ConnectionEventType
from aiortc import RTCConfiguration, RTCIceServer
# Set up client peer
options = PeerOptions(
host="0.peerjs.com",
port=443,
secure=True,
config=RTCConfiguration(
iceServers=[RTCIceServer(urls="stun:stun.l.google.com:19302")]
)
)
client_peer = Peer(id="test-client", peer_options=options)
await client_peer.start()
# Connect to proxy server
connection = client_peer.connect("computer-agent-proxy")
@connection.on(ConnectionEventType.Open)
async def connection_open():
print("Connected to proxy server")
# Send a test request
request = {
"model": "anthropic/claude-3-5-sonnet-20241022",
"input": "Hello from P2P client!"
}
await connection.send(json.dumps(request))
@connection.on(ConnectionEventType.Data)
async def connection_data(data):
print(f"Received response: {data}")
await client_peer.destroy()
# Wait for connection
await asyncio.sleep(10)
except ImportError:
print("P2P dependencies not available. Install peerjs-python for P2P testing.")
except Exception as e:
print(f"P2P test error: {e}")
if __name__ == "__main__":
import sys
if len(sys.argv) > 1 and sys.argv[1] == "curl":
curl_examples()
elif len(sys.argv) > 1 and sys.argv[1] == "p2p":
asyncio.run(test_p2p_client())
else:
asyncio.run(test_http_endpoint())
+160
View File
@@ -0,0 +1,160 @@
"""
Request handlers for the proxy endpoints.
"""
import asyncio
import json
import logging
import os
from typing import Dict, Any, List, Union, Optional
from ..agent import ComputerAgent
from computer import Computer
logger = logging.getLogger(__name__)
class ResponsesHandler:
"""Handler for /responses endpoint that processes agent requests."""
def __init__(self):
self.computer = None
self.agent = None
async def setup_computer(self, computer_kwargs: Optional[Dict[str, Any]] = None):
"""Set up computer instance with provided kwargs or defaults."""
if self.computer is not None:
return # Already set up
# Default computer configuration
default_config = {
"os_type": "linux",
"provider_type": "cloud",
"name": os.getenv("CUA_CONTAINER_NAME"),
"api_key": os.getenv("CUA_API_KEY")
}
# Override with provided kwargs
if computer_kwargs:
default_config.update(computer_kwargs)
self.computer = Computer(**default_config)
await self.computer.__aenter__()
logger.info(f"Computer set up with config: {default_config}")
async def setup_agent(self, model: str, agent_kwargs: Optional[Dict[str, Any]] = None):
"""Set up agent instance with provided model and kwargs."""
if self.computer is None:
raise RuntimeError("Computer must be set up before agent")
# Default agent configuration
default_config = {
"model": model,
"tools": [self.computer]
}
# Override with provided kwargs
if agent_kwargs:
# Don't override tools unless explicitly provided
if "tools" not in agent_kwargs:
agent_kwargs["tools"] = [self.computer]
default_config.update(agent_kwargs)
self.agent = ComputerAgent(**default_config)
logger.info(f"Agent set up with model: {model}")
async def process_request(self, request_data: Dict[str, Any]) -> Dict[str, Any]:
"""
Process a /responses request and return the result.
Args:
request_data: Dictionary containing model, input, and optional kwargs
Returns:
Dictionary with the agent's response
"""
try:
# Extract request parameters
model = request_data.get("model")
input_data = request_data.get("input")
agent_kwargs = request_data.get("agent_kwargs", {})
computer_kwargs = request_data.get("computer_kwargs", {})
if not model:
raise ValueError("Model is required")
if not input_data:
raise ValueError("Input is required")
# Set up computer and agent
await self.setup_computer(computer_kwargs)
await self.setup_agent(model, agent_kwargs)
# Convert input to messages format
messages = self._convert_input_to_messages(input_data)
# Run agent and get first result
async for result in self.agent.run(messages):
# Return the first result and break
return {
"success": True,
"result": result,
"model": model
}
# If no results were yielded
return {
"success": False,
"error": "No results from agent",
"model": model
}
except Exception as e:
logger.error(f"Error processing request: {e}")
return {
"success": False,
"error": str(e),
"model": request_data.get("model", "unknown")
}
def _convert_input_to_messages(self, input_data: Union[str, List[Dict[str, Any]]]) -> List[Dict[str, Any]]:
"""Convert input data to messages format."""
if isinstance(input_data, str):
# Simple string input
return [{"role": "user", "content": input_data}]
elif isinstance(input_data, list):
# Already in messages format
messages = []
for msg in input_data:
# Convert content array format if needed
if isinstance(msg.get("content"), list):
content_parts = []
for part in msg["content"]:
if part.get("type") == "input_text":
content_parts.append({"type": "text", "text": part["text"]})
elif part.get("type") == "input_image":
content_parts.append({
"type": "image_url",
"image_url": {"url": part["image_url"]}
})
else:
content_parts.append(part)
messages.append({
"role": msg["role"],
"content": content_parts
})
else:
messages.append(msg)
return messages
else:
raise ValueError("Input must be string or list of messages")
async def cleanup(self):
"""Clean up resources."""
if self.computer:
try:
await self.computer.__aexit__(None, None, None)
except Exception as e:
logger.error(f"Error cleaning up computer: {e}")
finally:
self.computer = None
self.agent = None
+185
View File
@@ -0,0 +1,185 @@
"""
P2P server implementation using peerjs-python for WebRTC connections.
"""
import asyncio
import json
import logging
from typing import Dict, Any, Optional
logger = logging.getLogger(__name__)
class P2PServer:
"""P2P server using peerjs-python for WebRTC connections."""
def __init__(self, handler, peer_id: Optional[str] = None, signaling_server: Optional[Dict[str, Any]] = None):
self.handler = handler
self.peer_id = peer_id or "computer-agent-proxy"
self.signaling_server = signaling_server or {
"host": "0.peerjs.com",
"port": 443,
"secure": True
}
self.peer = None
async def start(self):
"""Start P2P server with WebRTC connections."""
try:
from peerjs import Peer, PeerOptions, PeerEventType, ConnectionEventType
from aiortc import RTCConfiguration, RTCIceServer
# Set up peer options
ice_servers = [
{"urls": "stun:stun.l.google.com:19302"},
{"urls": "stun:stun1.l.google.com:19302"}
]
options = PeerOptions(
host=self.signaling_server["host"],
port=self.signaling_server["port"],
secure=self.signaling_server["secure"],
config=RTCConfiguration(
iceServers=[RTCIceServer(**srv) for srv in ice_servers]
)
)
# Create peer
self.peer = Peer(id=self.peer_id, peer_options=options)
await self.peer.start()
logger.info(f"P2P peer started with ID: {self.peer_id}")
# Set up connection handlers
@self.peer.on(PeerEventType.Connection)
async def peer_connection(peer_connection):
logger.info(f"Remote peer {peer_connection.peer} trying to establish connection")
await self._setup_connection_handlers(peer_connection)
@self.peer.on(PeerEventType.Error)
async def peer_error(error):
logger.error(f"Peer error: {error}")
# Keep the server running
while True:
await asyncio.sleep(1)
except ImportError as e:
logger.error(f"P2P dependencies not available: {e}")
logger.error("Install peerjs-python: pip install peerjs-python")
raise
except Exception as e:
logger.error(f"Error starting P2P server: {e}")
raise
async def _setup_connection_handlers(self, peer_connection):
"""Set up handlers for a peer connection."""
try:
from peerjs import ConnectionEventType
@peer_connection.on(ConnectionEventType.Open)
async def connection_open():
logger.info(f"Connection opened with peer {peer_connection.peer}")
# Send welcome message
welcome_msg = {
"type": "welcome",
"message": "Connected to ComputerAgent Proxy",
"endpoints": ["/responses"]
}
await peer_connection.send(json.dumps(welcome_msg))
@peer_connection.on(ConnectionEventType.Data)
async def connection_data(data):
logger.debug(f"Data received from peer {peer_connection.peer}: {data}")
try:
# Parse the incoming data
if isinstance(data, str):
request_data = json.loads(data)
else:
request_data = data
# Check if it's an HTTP-like request
if self._is_http_request(request_data):
response = await self._handle_http_request(request_data)
await peer_connection.send(json.dumps(response))
else:
# Direct API request
result = await self.handler.process_request(request_data)
await peer_connection.send(json.dumps(result))
except json.JSONDecodeError:
error_response = {
"success": False,
"error": "Invalid JSON in request"
}
await peer_connection.send(json.dumps(error_response))
except Exception as e:
logger.error(f"Error processing P2P request: {e}")
error_response = {
"success": False,
"error": str(e)
}
await peer_connection.send(json.dumps(error_response))
@peer_connection.on(ConnectionEventType.Close)
async def connection_close():
logger.info(f"Connection closed with peer {peer_connection.peer}")
@peer_connection.on(ConnectionEventType.Error)
async def connection_error(error):
logger.error(f"Connection error with peer {peer_connection.peer}: {error}")
except Exception as e:
logger.error(f"Error setting up connection handlers: {e}")
def _is_http_request(self, data: Dict[str, Any]) -> bool:
"""Check if the data looks like an HTTP request."""
return (
isinstance(data, dict) and
"method" in data and
"path" in data and
data.get("method") == "POST" and
data.get("path") == "/responses"
)
async def _handle_http_request(self, request_data: Dict[str, Any]) -> Dict[str, Any]:
"""Handle HTTP-like request over P2P."""
try:
method = request_data.get("method")
path = request_data.get("path")
body = request_data.get("body", {})
if method == "POST" and path == "/responses":
# Process the request body
result = await self.handler.process_request(body)
return {
"status": 200,
"headers": {"Content-Type": "application/json"},
"body": result
}
else:
return {
"status": 404,
"headers": {"Content-Type": "application/json"},
"body": {"success": False, "error": "Endpoint not found"}
}
except Exception as e:
logger.error(f"Error handling HTTP request: {e}")
return {
"status": 500,
"headers": {"Content-Type": "application/json"},
"body": {"success": False, "error": str(e)}
}
async def stop(self):
"""Stop the P2P server."""
if self.peer:
try:
await self.peer.destroy()
logger.info("P2P peer stopped")
except Exception as e:
logger.error(f"Error stopping P2P peer: {e}")
finally:
self.peer = None
+175
View File
@@ -0,0 +1,175 @@
"""
Proxy server implementation supporting both HTTP and P2P connections.
"""
import asyncio
import json
import logging
from typing import Dict, Any, Optional
from starlette.applications import Starlette
from starlette.routing import Route
from starlette.requests import Request
from starlette.responses import JSONResponse
from starlette.middleware import Middleware
from starlette.middleware.cors import CORSMiddleware
import uvicorn
from .handlers import ResponsesHandler
logger = logging.getLogger(__name__)
class ProxyServer:
"""Proxy server that can serve over HTTP and P2P."""
def __init__(self, host: str = "0.0.0.0", port: int = 8000):
self.host = host
self.port = port
self.handler = ResponsesHandler()
self.app = self._create_app()
def _create_app(self) -> Starlette:
"""Create Starlette application with routes."""
async def responses_endpoint(request: Request) -> JSONResponse:
"""Handle POST /responses requests."""
try:
# Parse JSON body
body = await request.body()
request_data = json.loads(body.decode('utf-8'))
# Process the request
result = await self.handler.process_request(request_data)
return JSONResponse(result)
except json.JSONDecodeError:
return JSONResponse({
"success": False,
"error": "Invalid JSON in request body"
}, status_code=400)
except Exception as e:
logger.error(f"Error in responses endpoint: {e}")
return JSONResponse({
"success": False,
"error": str(e)
}, status_code=500)
async def health_endpoint(request: Request) -> JSONResponse:
"""Health check endpoint."""
return JSONResponse({"status": "healthy"})
routes = [
Route("/responses", responses_endpoint, methods=["POST"]),
Route("/health", health_endpoint, methods=["GET"]),
]
middleware = [
Middleware(CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"])
]
return Starlette(routes=routes, middleware=middleware)
async def start_http(self):
"""Start HTTP server."""
logger.info(f"Starting HTTP server on {self.host}:{self.port}")
config = uvicorn.Config(
self.app,
host=self.host,
port=self.port,
log_level="info"
)
server = uvicorn.Server(config)
await server.serve()
async def start_p2p(self, peer_id: Optional[str] = None, signaling_server: Optional[Dict[str, Any]] = None):
"""Start P2P server using peerjs-python."""
try:
from .p2p_server import P2PServer
p2p_server = P2PServer(
handler=self.handler,
peer_id=peer_id,
signaling_server=signaling_server
)
await p2p_server.start()
except ImportError:
logger.error("P2P dependencies not available. Install peerjs-python for P2P support.")
raise
async def start(self, mode: str = "http", **kwargs):
"""
Start the server in specified mode.
Args:
mode: "http", "p2p", or "both"
**kwargs: Additional arguments for specific modes
"""
if mode == "http":
await self.start_http()
elif mode == "p2p":
await self.start_p2p(**kwargs)
elif mode == "both":
# Start both HTTP and P2P servers concurrently
tasks = [
asyncio.create_task(self.start_http()),
asyncio.create_task(self.start_p2p(**kwargs))
]
await asyncio.gather(*tasks)
else:
raise ValueError(f"Invalid mode: {mode}. Must be 'http', 'p2p', or 'both'")
async def cleanup(self):
"""Clean up resources."""
await self.handler.cleanup()
async def main():
"""Main entry point for running the proxy server."""
import argparse
parser = argparse.ArgumentParser(description="ComputerAgent Proxy Server")
parser.add_argument("--host", default="0.0.0.0", help="Host to bind to")
parser.add_argument("--port", type=int, default=8000, help="Port to bind to")
parser.add_argument("--mode", choices=["http", "p2p", "both"], default="http",
help="Server mode")
parser.add_argument("--peer-id", help="Peer ID for P2P mode")
parser.add_argument("--signaling-host", default="0.peerjs.com", help="Signaling server host")
parser.add_argument("--signaling-port", type=int, default=443, help="Signaling server port")
parser.add_argument("--signaling-secure", action="store_true", help="Use secure signaling")
args = parser.parse_args()
# Set up logging
logging.basicConfig(level=logging.INFO)
# Create server
server = ProxyServer(host=args.host, port=args.port)
# Prepare P2P kwargs if needed
p2p_kwargs = {}
if args.mode in ["p2p", "both"]:
p2p_kwargs = {
"peer_id": args.peer_id,
"signaling_server": {
"host": args.signaling_host,
"port": args.signaling_port,
"secure": args.signaling_secure
}
}
try:
await server.start(mode=args.mode, **p2p_kwargs)
except KeyboardInterrupt:
logger.info("Shutting down server...")
finally:
await server.cleanup()
if __name__ == "__main__":
asyncio.run(main())