diff --git a/libs/python/agent/agent/proxy/README.md b/libs/python/agent/agent/proxy/README.md deleted file mode 100644 index c4f8dc88..00000000 --- a/libs/python/agent/agent/proxy/README.md +++ /dev/null @@ -1,232 +0,0 @@ -# ComputerAgent Proxy - -A proxy server that exposes ComputerAgent functionality over HTTP and P2P (WebRTC) connections. This allows remote clients to interact with ComputerAgent instances through a simple REST-like API. - -## Installation - -The proxy requires additional dependencies: - -```bash -# For HTTP server -pip install starlette uvicorn - -# For P2P server (optional) -pip install peerjs-python aiortc -``` - -## Usage - -### Starting the Server - -```bash -# HTTP server only (default) -python -m agent.proxy.cli - -# Custom host/port -python -m agent.proxy.cli --host 0.0.0.0 --port 8080 - -# P2P server only -python -m agent.proxy.cli --mode p2p - -# Both HTTP and P2P -python -m agent.proxy.cli --mode both -``` - -### API Endpoints - -#### POST /responses - -Process a request using ComputerAgent and return the first result. - -**Request Format:** -```json -{ - "model": "anthropic/claude-3-5-sonnet-20241022", - "input": "Your instruction here", - # Optional agent configuration, passed to ComputerAgent constructor - "agent_kwargs": { - "save_trajectory": true, - "verbosity": 20 - }, - # Optional computer configuration, passed to the Computer constructor - "computer_kwargs": { - "os_type": "linux", - "provider_type": "cloud" - }, - # Optional environment overrides for this request - "env": { - "OPENROUTER_API_KEY": "your-openrouter-api-key", - # etc. - # ref: https://docs.litellm.ai/docs/proxy/config_settings#environment-variables---reference - } -} -``` - -**Multi-modal Request:** -```json -{ - "model": "anthropic/claude-3-5-sonnet-20241022", - "input": [ - { - "role": "user", - "content": [ - {"type": "input_text", "text": "what is in this image?"}, - { - "type": "input_image", - "image_url": "https://example.com/image.jpg" - } - ] - } - ] -} -``` - -**Response Format:** -```json -{ - "result": { - // Agent response data - }, - "model": "anthropic/claude-3-5-sonnet-20241022" -} -``` - -#### GET /health - -Health check endpoint. - -### cURL Examples - -```bash -# Simple text request -curl http://localhost:8000/responses \ - -H "Content-Type: application/json" \ - -d '{ - "model": "anthropic/claude-3-5-sonnet-20241022", - "input": "Tell me a three sentence bedtime story about a unicorn.", - "env": {"CUA_API_KEY": "override-key-for-this-request"} - }' - -# Multi-modal request -curl http://localhost:8000/responses \ - -H "Content-Type: application/json" \ - -d '{ - "model": "anthropic/claude-3-5-sonnet-20241022", - "input": [ - { - "role": "user", - "content": [ - {"type": "input_text", "text": "what is in this image?"}, - { - "type": "input_image", - "image_url": "https://upload.wikimedia.org/wikipedia/commons/thumb/d/dd/Gfp-wisconsin-madison-the-nature-boardwalk.jpg/2560px-Gfp-wisconsin-madison-the-nature-boardwalk.jpg" - } - ] - } - ] - }' -``` - -### P2P Usage - -The P2P server allows WebRTC connections for direct peer-to-peer communication: - -```python -# Connect to P2P proxy -from peerjs import Peer, PeerOptions -import json - -peer = Peer(id="client", peer_options=PeerOptions(host="localhost", port=9000)) -await peer.start() - -connection = peer.connect("computer-agent-proxy") - -# Send request -request = { - "model": "anthropic/claude-3-5-sonnet-20241022", - "input": "Hello from P2P!" -} -await connection.send(json.dumps(request)) -``` - -## Configuration - -### Request Parameters - -- `model`: Model string (required) - e.g., "anthropic/claude-3-5-sonnet-20241022" -- `input`: String or message array (required) -- `agent_kwargs`: Optional agent configuration -- `computer_kwargs`: Optional computer configuration -- `env`: Object - key/value environment variables to override for this request - -### Agent Configuration (`agent_kwargs`) - -Common options: -- `save_trajectory`: Boolean - Save conversation trajectory -- `verbosity`: Integer - Logging level (10=DEBUG, 20=INFO, etc.) -- `max_trajectory_budget`: Float - Budget limit for trajectory - -### Computer Configuration (`computer_kwargs`) - -Common options: -- `os_type`: String - "linux", "windows", "macos" -- `provider_type`: String - "cloud", "local", "docker" -- `name`: String - Instance name -- `api_key`: String - Provider API key - -## Architecture - -``` -┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ -│ HTTP Client │ │ P2P Client │ │ Direct Usage │ -└─────────┬───────┘ └─────────┬───────┘ └─────────┬───────┘ - │ │ │ - ▼ ▼ ▼ -┌─────────────────────────────────────────────────────────────────┐ -│ ProxyServer │ -├─────────────────────────────────────────────────────────────────┤ -│ ResponsesHandler │ -├─────────────────────────────────────────────────────────────────┤ -│ ComputerAgent + Computer │ -└─────────────────────────────────────────────────────────────────┘ -``` - -## Examples - -See `examples.py` for complete usage examples: - -```bash -# Run HTTP tests -python agent/proxy/examples.py - -# Show curl examples -python agent/proxy/examples.py curl - -# Test P2P (requires peerjs-python) -python agent/proxy/examples.py p2p -``` - -## Error Handling - -The proxy returns structured error responses: - -```json -{ - "success": false, - "error": "Error description", - "model": "model-used" -} -``` - -Common errors: -- Missing required parameters (`model`, `input`) -- Invalid JSON in request body -- Agent execution errors -- Computer setup failures - -## Limitations - -- Returns only the first result from agent.run() (as requested) -- P2P requires peerjs-python and signaling server -- Computer instances are created per request (not pooled) -- No authentication/authorization built-in diff --git a/libs/python/agent/agent/proxy/__init__.py b/libs/python/agent/agent/proxy/__init__.py deleted file mode 100644 index 294f5773..00000000 --- a/libs/python/agent/agent/proxy/__init__.py +++ /dev/null @@ -1,8 +0,0 @@ -""" -Proxy module for exposing ComputerAgent over HTTP and P2P connections. -""" - -from .server import ProxyServer -from .handlers import ResponsesHandler - -__all__ = ['ProxyServer', 'ResponsesHandler'] diff --git a/libs/python/agent/agent/proxy/cli.py b/libs/python/agent/agent/proxy/cli.py deleted file mode 100644 index 5945320b..00000000 --- a/libs/python/agent/agent/proxy/cli.py +++ /dev/null @@ -1,21 +0,0 @@ -""" -CLI entry point for the proxy server. -""" - -import asyncio -import sys -from .server import main - -def cli(): - """CLI entry point.""" - try: - asyncio.run(main()) - except KeyboardInterrupt: - print("\nShutting down...") - sys.exit(0) - except Exception as e: - print(f"Error: {e}") - sys.exit(1) - -if __name__ == "__main__": - cli() diff --git a/libs/python/agent/agent/proxy/examples.py b/libs/python/agent/agent/proxy/examples.py deleted file mode 100644 index 617a7270..00000000 --- a/libs/python/agent/agent/proxy/examples.py +++ /dev/null @@ -1,219 +0,0 @@ -""" -Example usage of the proxy server and client requests. -""" -import dotenv -dotenv.load_dotenv() - -import asyncio -import json -import os -import aiohttp -from typing import Dict, Any - -def print_agent_response(result: dict): - # Pretty-print AgentResponse per your schema - output = result.get("output", []) or [] - usage = result.get("usage") or {} - - for msg in output: - t = msg.get("type") - if t == "message": - role = msg.get("role") - if role == "assistant": - for c in msg.get("content", []): - if c.get("type") == "output_text": - print(f"assistant> {c.get('text','')}") - elif t == "reasoning": - for s in msg.get("summary", []): - if s.get("type") == "summary_text": - print(f"(thought) {s.get('text','')}") - elif t == "computer_call": - action = msg.get("action", {}) - a_type = action.get("type", "action") - # Compact action preview (omit bulky fields) - preview = {k: v for k, v in action.items() if k not in ("type", "path", "image")} - print(f"🛠 computer_call {a_type} {preview} (id={msg.get('call_id')})") - elif t == "computer_call_output": - print(f"🖼 screenshot (id={msg.get('call_id')})") - elif t == "function_call": - print(f"🔧 fn {msg.get('name')}({msg.get('arguments')}) (id={msg.get('call_id')})") - elif t == "function_call_output": - print(f"🔧 fn result: {msg.get('output')} (id={msg.get('call_id')})") - - if usage: - print( - f"[usage] prompt={usage.get('prompt_tokens',0)} " - f"completion={usage.get('completion_tokens',0)} " - f"total={usage.get('total_tokens',0)} " - f"cost=${usage.get('response_cost',0)}" - ) - - -async def test_http_endpoint(): - """Test the HTTP /responses endpoint.""" - - openai_api_key = os.getenv("OPENAI_API_KEY") - assert isinstance(openai_api_key, str), "OPENAI_API_KEY environment variable must be set" - anthropic_api_key = os.getenv("ANTHROPIC_API_KEY") - assert isinstance(anthropic_api_key, str), "ANTHROPIC_API_KEY environment variable must be set" - - # Test requests - base_url = "https://m-linux-96lcxd2c2k.containers.cloud.trycua.com:8443" - # base_url = "http://localhost:8000" - api_key = os.getenv("CUA_API_KEY") - assert isinstance(api_key, str), "CUA_API_KEY environment variable must be set" - - async with aiohttp.ClientSession() as session: - for i, request_data in enumerate([ - # ==== Request Body Examples ==== - - # Simple text request - { - "model": "anthropic/claude-3-5-sonnet-20241022", - "input": "Hello!", - "env": { - "ANTHROPIC_API_KEY": anthropic_api_key - } - }, - - # { - # "model": "openai/computer-use-preview", - # "input": "Hello!", - # "env": { - # "OPENAI_API_KEY": openai_api_key - # } - # }, - - # Multimodal request with image - # { - # "model": "anthropic/claude-3-5-sonnet-20241022", - # "input": [ - # { - # "role": "user", - # "content": [ - # {"type": "input_text", "text": "what is in this image?"}, - # { - # "type": "input_image", - # "image_url": "https://upload.wikimedia.org/wikipedia/commons/thumb/d/dd/Gfp-wisconsin-madison-the-nature-boardwalk.jpg/2560px-Gfp-wisconsin-madison-the-nature-boardwalk.jpg" - # } - # ] - # } - # ], - # "env": { - # "ANTHROPIC_API_KEY": anthropic_api_key - # } - # } - - ], 1): - print(f"\n--- Test {i} ---") - print(f"Request: {json.dumps(request_data, indent=2)}") - - try: - print(f"Sending request to {base_url}/responses") - async with session.post( - f"{base_url}/responses", - json=request_data, - headers={"Content-Type": "application/json", "X-API-Key": api_key} - ) as response: - text_result = await response.text() - print(f"Response Text: {text_result}") - - result = json.loads(text_result) - print(f"Status: {response.status}") - print(f"Response: {json.dumps(result, indent=2)}") - print(f"Response Headers:") - for header in response.headers: - print(f"- {header}: {response.headers[header]}") - - except Exception as e: - print(f"Error: {e}") - -async def simple_repl(): - base_url = "https://m-linux-96lcxd2c2k.containers.cloud.trycua.com:8443" - api_key = os.getenv("CUA_API_KEY", "") - anthropic_api_key = os.getenv("ANTHROPIC_API_KEY", "") - openai_api_key = os.getenv("OPENAI_API_KEY", "") - model = "openai/computer-use-preview" - - messages = [] - async with aiohttp.ClientSession() as session: - while True: - if not messages or messages[-1].get("type") == "message": - user_text = input("you> ").strip() - if user_text == "exit": - break - if user_text: - messages += [{"role": "user", "content": user_text}] # loop - - payload = { - "model": model, - "input": messages, - "env": { - "ANTHROPIC_API_KEY": anthropic_api_key, - "OPENAI_API_KEY": openai_api_key - } - } - async with session.post(f"{base_url}/responses", - json=payload, - headers={"Content-Type": "application/json", "X-API-Key": api_key}) as resp: - result = json.loads(await resp.text()) - print_agent_response(result) - - messages += result.get("output", []) # request - -async def test_p2p_client(): - """Example P2P client using peerjs-python.""" - try: - from peerjs import Peer, PeerOptions, ConnectionEventType - from aiortc import RTCConfiguration, RTCIceServer - - # Set up client peer - options = PeerOptions( - host="0.peerjs.com", - port=443, - secure=True, - config=RTCConfiguration( - iceServers=[RTCIceServer(urls="stun:stun.l.google.com:19302")] - ) - ) - - client_peer = Peer(id="test-client", peer_options=options) - await client_peer.start() - - # Connect to proxy server - connection = client_peer.connect("computer-agent-proxy") - - @connection.on(ConnectionEventType.Open) - async def connection_open(): - print("Connected to proxy server") - - # Send a test request - request = { - "model": "anthropic/claude-3-5-sonnet-20241022", - "input": "Hello from P2P client!" - } - await connection.send(json.dumps(request)) - - @connection.on(ConnectionEventType.Data) - async def connection_data(data): - print(f"Received response: {data}") - await client_peer.destroy() - - # Wait for connection - await asyncio.sleep(10) - - except ImportError: - print("P2P dependencies not available. Install peerjs-python for P2P testing.") - except Exception as e: - print(f"P2P test error: {e}") - - -if __name__ == "__main__": - import sys - - if len(sys.argv) > 1 and sys.argv[1] == "p2p": - asyncio.run(test_p2p_client()) - elif len(sys.argv) > 1 and sys.argv[1] == "repl": - asyncio.run(simple_repl()) - else: - asyncio.run(test_http_endpoint()) diff --git a/libs/python/agent/agent/proxy/handlers.py b/libs/python/agent/agent/proxy/handlers.py deleted file mode 100644 index 5260e98c..00000000 --- a/libs/python/agent/agent/proxy/handlers.py +++ /dev/null @@ -1,243 +0,0 @@ -""" -Request handlers for the proxy endpoints. -""" - -import asyncio -import json -import logging -import os -from contextlib import contextmanager -from typing import Dict, Any, List, Union, Optional - -from ..agent import ComputerAgent -from computer import Computer - -logger = logging.getLogger(__name__) - - -class ResponsesHandler: - """Handler for /responses endpoint that processes agent requests.""" - - def __init__(self): - self.computer = None - self.agent = None - # Simple in-memory caches - self._computer_cache: Dict[str, Any] = {} - self._agent_cache: Dict[str, Any] = {} - - async def setup_computer_agent( - self, - model: str, - agent_kwargs: Optional[Dict[str, Any]] = None, - computer_kwargs: Optional[Dict[str, Any]] = None, - ): - """Set up (and cache) computer and agent instances. - - Caching keys: - - Computer cache key: computer_kwargs - - Agent cache key: {"model": model, **agent_kwargs} - """ - agent_kwargs = agent_kwargs or {} - computer_kwargs = computer_kwargs or {} - - def _stable_key(obj: Dict[str, Any]) -> str: - try: - return json.dumps(obj, sort_keys=True, separators=(",", ":")) - except Exception: - # Fallback: stringify non-serializable values - safe_obj = {} - for k, v in obj.items(): - try: - json.dumps(v) - safe_obj[k] = v - except Exception: - safe_obj[k] = str(v) - return json.dumps(safe_obj, sort_keys=True, separators=(",", ":")) - - # ---------- Computer setup (with cache) ---------- - comp_key = _stable_key(computer_kwargs) - - computer = self._computer_cache.get(comp_key) - if computer is None: - # Default computer configuration - default_c_config = { - "os_type": "linux", - "provider_type": "cloud", - "name": os.getenv("CUA_CONTAINER_NAME"), - "api_key": os.getenv("CUA_API_KEY"), - } - default_c_config.update(computer_kwargs) - computer = Computer(**default_c_config) - await computer.__aenter__() - self._computer_cache[comp_key] = computer - logger.info(f"Computer created and cached with key={comp_key} config={default_c_config}") - else: - logger.info(f"Reusing cached computer for key={comp_key}") - - # Bind current computer reference - self.computer = computer - - # ---------- Agent setup (with cache) ---------- - # Build agent cache key from {model} + agent_kwargs (excluding tools unless explicitly passed) - agent_kwargs_for_key = dict(agent_kwargs) - agent_key_payload = {"model": model, **agent_kwargs_for_key} - agent_key = _stable_key(agent_key_payload) - - agent = self._agent_cache.get(agent_key) - if agent is None: - # Default agent configuration - default_a_config = { - "model": model, - "tools": [computer], - } - # Apply user overrides, but keep tools unless user explicitly sets - if agent_kwargs: - if "tools" not in agent_kwargs: - agent_kwargs["tools"] = [computer] - default_a_config.update(agent_kwargs) - agent = ComputerAgent(**default_a_config) - self._agent_cache[agent_key] = agent - logger.info(f"Agent created and cached with key={agent_key} model={model}") - else: - # Ensure cached agent uses the current computer tool (in case object differs) - # Only update if tools not explicitly provided in agent_kwargs - if "tools" not in agent_kwargs: - try: - agent.tools = [computer] - except Exception: - pass - logger.info(f"Reusing cached agent for key={agent_key}") - - # Bind current agent reference - self.agent = agent - - async def process_request(self, request_data: Dict[str, Any]) -> Dict[str, Any]: - """ - Process a /responses request and return the result. - - Args: - request_data: Dictionary containing model, input, and optional kwargs - - Returns: - Dictionary with the agent's response - """ - try: - # Extract request parameters - model = request_data.get("model") - input_data = request_data.get("input") - agent_kwargs = request_data.get("agent_kwargs", {}) - computer_kwargs = request_data.get("computer_kwargs", {}) - env_overrides = request_data.get("env", {}) or {} - - if not model: - raise ValueError("Model is required") - if not input_data: - raise ValueError("Input is required") - - # Apply env overrides for the duration of this request - with self._env_overrides(env_overrides): - # Set up (and possibly reuse) computer and agent via caches - await self.setup_computer_agent(model, agent_kwargs, computer_kwargs) - - # Defensive: ensure agent is initialized for type checkers - agent = self.agent - if agent is None: - raise RuntimeError("Agent failed to initialize") - - # Convert input to messages format - messages = self._convert_input_to_messages(input_data) - - # Run agent and get first result - async for result in agent.run(messages): - # Return the first result and break - return { - "success": True, - "result": result, - "model": model - } - - # If no results were yielded - return { - "success": False, - "error": "No results from agent", - "model": model - } - - except Exception as e: - logger.error(f"Error processing request: {e}") - return { - "success": False, - "error": str(e), - "model": request_data.get("model", "unknown") - } - - def _convert_input_to_messages(self, input_data: Union[str, List[Dict[str, Any]]]) -> List[Dict[str, Any]]: - """Convert input data to messages format.""" - if isinstance(input_data, str): - # Simple string input - return [{"role": "user", "content": input_data}] - elif isinstance(input_data, list): - # Already in messages format - messages = [] - for msg in input_data: - # Convert content array format if needed - if isinstance(msg.get("content"), list): - content_parts = [] - for part in msg["content"]: - if part.get("type") == "input_text": - content_parts.append({"type": "text", "text": part["text"]}) - elif part.get("type") == "input_image": - content_parts.append({ - "type": "image_url", - "image_url": {"url": part["image_url"]} - }) - else: - content_parts.append(part) - messages.append({ - "role": msg["role"], - "content": content_parts - }) - else: - messages.append(msg) - return messages - else: - raise ValueError("Input must be string or list of messages") - - async def cleanup(self): - """Clean up resources.""" - if self.computer: - try: - await self.computer.__aexit__(None, None, None) - except Exception as e: - logger.error(f"Error cleaning up computer: {e}") - finally: - self.computer = None - self.agent = None - - @staticmethod - @contextmanager - def _env_overrides(env: Dict[str, str]): - """Temporarily apply environment variable overrides for the current process. - Restores previous values after the context exits. - - Args: - env: Mapping of env var names to override for this request. - """ - if not env: - # No-op context - yield - return - - original: Dict[str, Optional[str]] = {} - try: - for k, v in env.items(): - original[k] = os.environ.get(k) - os.environ[k] = str(v) - yield - finally: - for k, old in original.items(): - if old is None: - # Was not set before - os.environ.pop(k, None) - else: - os.environ[k] = old diff --git a/libs/python/agent/agent/proxy/p2p_server.py b/libs/python/agent/agent/proxy/p2p_server.py deleted file mode 100644 index e658428b..00000000 --- a/libs/python/agent/agent/proxy/p2p_server.py +++ /dev/null @@ -1,188 +0,0 @@ -""" -P2P server implementation using peerjs-python for WebRTC connections. -""" - -import asyncio -import json -import logging -import uuid -from typing import Dict, Any, Optional - -logger = logging.getLogger(__name__) - - -class P2PServer: - """P2P server using peerjs-python for WebRTC connections.""" - - def __init__(self, handler, peer_id: Optional[str] = None, signaling_server: Optional[Dict[str, Any]] = None): - self.handler = handler - self.peer_id = peer_id or uuid.uuid4().hex - self.signaling_server = signaling_server or { - "host": "0.peerjs.com", - "port": 443, - "secure": True - } - self.peer = None - - async def start(self): - """Start P2P server with WebRTC connections.""" - try: - from peerjs_py.peer import Peer, PeerOptions - from peerjs_py.enums import PeerEventType, ConnectionEventType - - # Set up peer options - ice_servers = [ - {"urls": "stun:stun.l.google.com:19302"}, - {"urls": "stun:stun1.l.google.com:19302"} - ] - - # Create peer with PeerOptions (config should be a dict, not RTCConfiguration) - peer_options = PeerOptions( - host=self.signaling_server["host"], - port=self.signaling_server["port"], - secure=self.signaling_server["secure"], - config={ - "iceServers": ice_servers - } - ) - - # Create peer - self.peer = Peer(id=self.peer_id, options=peer_options) - await self.peer.start() - # logger.info(f"P2P peer started with ID: {self.peer_id}") - print(f"Agent proxy started at peer://{self.peer_id}") - - # Set up connection handlers using string event names - @self.peer.on('connection') - async def peer_connection(peer_connection): - logger.info(f"Remote peer {peer_connection.peer} trying to establish connection") - await self._setup_connection_handlers(peer_connection) - - @self.peer.on('error') - async def peer_error(error): - logger.error(f"Peer error: {error}") - - # Keep the server running - while True: - await asyncio.sleep(1) - - except ImportError as e: - logger.error(f"P2P dependencies not available: {e}") - logger.error("Install peerjs-python: pip install peerjs-python") - raise - except Exception as e: - logger.error(f"Error starting P2P server: {e}") - raise - - async def _setup_connection_handlers(self, peer_connection): - """Set up handlers for a peer connection.""" - try: - # Use string event names instead of enum types - - @peer_connection.on('open') - async def connection_open(): - logger.info(f"Connection opened with peer {peer_connection.peer}") - - # Send welcome message - welcome_msg = { - "type": "welcome", - "message": "Connected to ComputerAgent Proxy", - "endpoints": ["/responses"] - } - await peer_connection.send(json.dumps(welcome_msg)) - - @peer_connection.on('data') - async def connection_data(data): - logger.debug(f"Data received from peer {peer_connection.peer}: {data}") - - try: - # Parse the incoming data - if isinstance(data, str): - request_data = json.loads(data) - else: - request_data = data - - # Check if it's an HTTP-like request - if self._is_http_request(request_data): - response = await self._handle_http_request(request_data) - await peer_connection.send(json.dumps(response)) - else: - # Direct API request - result = await self.handler.process_request(request_data) - await peer_connection.send(json.dumps(result)) - - except json.JSONDecodeError: - error_response = { - "success": False, - "error": "Invalid JSON in request" - } - await peer_connection.send(json.dumps(error_response)) - except Exception as e: - logger.error(f"Error processing P2P request: {e}") - error_response = { - "success": False, - "error": str(e) - } - await peer_connection.send(json.dumps(error_response)) - - @peer_connection.on('close') - async def connection_close(): - logger.info(f"Connection closed with peer {peer_connection.peer}") - - @peer_connection.on('error') - async def connection_error(error): - logger.error(f"Connection error with peer {peer_connection.peer}: {error}") - - except Exception as e: - logger.error(f"Error setting up connection handlers: {e}") - - def _is_http_request(self, data: Dict[str, Any]) -> bool: - """Check if the data looks like an HTTP request.""" - return ( - isinstance(data, dict) and - "method" in data and - "path" in data and - data.get("method") == "POST" and - data.get("path") == "/responses" - ) - - async def _handle_http_request(self, request_data: Dict[str, Any]) -> Dict[str, Any]: - """Handle HTTP-like request over P2P.""" - try: - method = request_data.get("method") - path = request_data.get("path") - body = request_data.get("body", {}) - - if method == "POST" and path == "/responses": - # Process the request body - result = await self.handler.process_request(body) - return { - "status": 200, - "headers": {"Content-Type": "application/json"}, - "body": result - } - else: - return { - "status": 404, - "headers": {"Content-Type": "application/json"}, - "body": {"success": False, "error": "Endpoint not found"} - } - - except Exception as e: - logger.error(f"Error handling HTTP request: {e}") - return { - "status": 500, - "headers": {"Content-Type": "application/json"}, - "body": {"success": False, "error": str(e)} - } - - async def stop(self): - """Stop the P2P server.""" - if self.peer: - try: - await self.peer.destroy() - logger.info("P2P peer stopped") - except Exception as e: - logger.error(f"Error stopping P2P peer: {e}") - finally: - self.peer = None diff --git a/libs/python/agent/agent/proxy/server.py b/libs/python/agent/agent/proxy/server.py deleted file mode 100644 index 2530a096..00000000 --- a/libs/python/agent/agent/proxy/server.py +++ /dev/null @@ -1,176 +0,0 @@ -""" -Proxy server implementation supporting both HTTP and P2P connections. -""" - -import asyncio -import json -import logging -from typing import Dict, Any, Optional -from starlette.applications import Starlette -from starlette.routing import Route -from starlette.requests import Request -from starlette.responses import JSONResponse -from starlette.middleware import Middleware -from starlette.middleware.cors import CORSMiddleware -import uvicorn - -from .handlers import ResponsesHandler - -logger = logging.getLogger(__name__) - - -class ProxyServer: - """Proxy server that can serve over HTTP and P2P.""" - - def __init__(self, host: str = "0.0.0.0", port: int = 8000): - self.host = host - self.port = port - self.handler = ResponsesHandler() - self.app = self._create_app() - - def _create_app(self) -> Starlette: - """Create Starlette application with routes.""" - - async def responses_endpoint(request: Request) -> JSONResponse: - """Handle POST /responses requests.""" - try: - # Parse JSON body - body = await request.body() - request_data = json.loads(body.decode('utf-8')) - - # Process the request - result = await self.handler.process_request(request_data) - - return JSONResponse(result) - - except json.JSONDecodeError: - return JSONResponse({ - "success": False, - "error": "Invalid JSON in request body" - }, status_code=400) - except Exception as e: - logger.error(f"Error in responses endpoint: {e}") - return JSONResponse({ - "success": False, - "error": str(e) - }, status_code=500) - - async def health_endpoint(request: Request) -> JSONResponse: - """Health check endpoint.""" - return JSONResponse({"status": "healthy"}) - - routes = [ - Route("/responses", responses_endpoint, methods=["POST"]), - Route("/health", health_endpoint, methods=["GET"]), - ] - - middleware = [ - Middleware(CORSMiddleware, - allow_origins=["*"], - allow_credentials=True, - allow_methods=["*"], - allow_headers=["*"]) - ] - - return Starlette(routes=routes, middleware=middleware) - - async def start_http(self): - """Start HTTP server.""" - # logger.info(f"Starting HTTP server on {self.host}:{self.port}") - print(f"Agent proxy started at http://{self.host}:{self.port}") - config = uvicorn.Config( - self.app, - host=self.host, - port=self.port, - log_level="info" - ) - server = uvicorn.Server(config) - await server.serve() - - async def start_p2p(self, peer_id: Optional[str] = None, signaling_server: Optional[Dict[str, Any]] = None): - """Start P2P server using peerjs-python.""" - try: - from .p2p_server import P2PServer - - p2p_server = P2PServer( - handler=self.handler, - peer_id=peer_id, - signaling_server=signaling_server - ) - await p2p_server.start() - - except ImportError: - logger.error("P2P dependencies not available. Install peerjs-python for P2P support.") - raise - - async def start(self, mode: str = "http", **kwargs): - """ - Start the server in specified mode. - - Args: - mode: "http", "p2p", or "both" - **kwargs: Additional arguments for specific modes - """ - if mode == "http": - await self.start_http() - elif mode == "p2p": - await self.start_p2p(**kwargs) - elif mode == "both": - # Start both HTTP and P2P servers concurrently - tasks = [ - asyncio.create_task(self.start_http()), - asyncio.create_task(self.start_p2p(**kwargs)) - ] - await asyncio.gather(*tasks) - else: - raise ValueError(f"Invalid mode: {mode}. Must be 'http', 'p2p', or 'both'") - - async def cleanup(self): - """Clean up resources.""" - await self.handler.cleanup() - - -async def main(): - """Main entry point for running the proxy server.""" - import argparse - - parser = argparse.ArgumentParser(description="ComputerAgent Proxy Server") - parser.add_argument("--host", default="0.0.0.0", help="Host to bind to") - parser.add_argument("--port", type=int, default=8000, help="Port to bind to") - parser.add_argument("--mode", choices=["http", "p2p", "both"], default="http", - help="Server mode") - parser.add_argument("--peer-id", help="Peer ID for P2P mode") - parser.add_argument("--signaling-host", default="0.peerjs.com", help="Signaling server host") - parser.add_argument("--signaling-port", type=int, default=443, help="Signaling server port") - parser.add_argument("--signaling-secure", action="store_true", help="Use secure signaling") - - args = parser.parse_args() - - # Set up logging - logging.basicConfig(level=logging.INFO) - - # Create server - server = ProxyServer(host=args.host, port=args.port) - - # Prepare P2P kwargs if needed - p2p_kwargs = {} - if args.mode in ["p2p", "both"]: - p2p_kwargs = { - "peer_id": args.peer_id, - "signaling_server": { - "host": args.signaling_host, - "port": args.signaling_port, - "secure": args.signaling_secure - } - } - - try: - await server.start(mode=args.mode, **p2p_kwargs) - except KeyboardInterrupt: - logger.info("Shutting down server...") - finally: - await server.cleanup() - - -if __name__ == "__main__": - asyncio.run(main())