Changed /responses endpoint to be standalone

This commit is contained in:
Dillon DuPont
2025-08-26 10:24:47 -04:00
parent 5db20b9995
commit cdc2b58d94

View File

@@ -440,24 +440,21 @@ async def agent_response_endpoint(
api_key: Optional[str] = Header(None, alias="X-API-Key"),
):
"""
Run a ComputerAgent step using server-side handlers as a tool.
Minimal proxy to run ComputerAgent for up to 2 turns.
Security:
- If CONTAINER_NAME is set on the server, require X-API-Key
and validate using AuthenticationManager.
- If CUA_ENABLE_PUBLIC_PROXY is set, allow public access.
and validate using AuthenticationManager unless CUA_ENABLE_PUBLIC_PROXY is true.
Body JSON:
{
"model": "...", # required
"input": "... or messages[]", # required
"agent_kwargs": { ... }, # optional, will be merged; tools will be overridden
"computer_kwargs": { ... }, # optional, ignored for this endpoint
"agent_kwargs": { ... }, # optional, passed directly to ComputerAgent
"env": { ... } # optional env overrides for agent
}
"""
from agent.proxy.handlers import ResponsesHandler
from agent.computers import AsyncComputerHandler
from agent.agent import ComputerAgent
# Authenticate via AuthenticationManager if running in cloud (CONTAINER_NAME set)
container_name = os.environ.get("CONTAINER_NAME")
@@ -481,98 +478,185 @@ async def agent_response_endpoint(
if not model or input_data is None:
raise HTTPException(status_code=400, detail="'model' and 'input' are required")
agent_kwargs = body.get("agent_kwargs") or {}
env_overrides = body.get("env") or {}
agent_kwargs: Dict[str, Any] = body.get("agent_kwargs") or {}
env_overrides: Dict[str, str] = body.get("env") or {}
# Simple env override context
class _EnvOverride:
def __init__(self, overrides: Dict[str, str]):
self.overrides = overrides
self._original: Dict[str, Optional[str]] = {}
def __enter__(self):
for k, v in (self.overrides or {}).items():
self._original[k] = os.environ.get(k)
os.environ[k] = str(v)
def __exit__(self, exc_type, exc, tb):
for k, old in self._original.items():
if old is None:
os.environ.pop(k, None)
else:
os.environ[k] = old
# Convert input to messages
def _to_messages(data: Union[str, List[Dict[str, Any]]]) -> List[Dict[str, Any]]:
if isinstance(data, str):
return [{"role": "user", "content": data}]
if isinstance(data, list):
return data
messages = _to_messages(input_data)
# Define a direct computer tool that implements the AsyncComputerHandler protocol
# and delegates to our existing automation/file/accessibility handlers.
from agent.computers import AsyncComputerHandler # runtime-checkable Protocol
class DirectComputer(AsyncComputerHandler):
def __init__(self):
# use module-scope handler singletons created by HandlerFactory
self._auto = automation_handler
self._file = file_handler
self._access = accessibility_handler
# Local AsyncComputerHandler implementation backed by automation_handler
class DirectComputerHandler(AsyncComputerHandler):
async def get_environment(self) -> Literal["windows", "mac", "linux", "browser"]:
sys = platform.system().lower()
if "darwin" in sys or sys == "macos" or sys == "mac":
if "darwin" in sys or sys in ("macos", "mac"):
return "mac"
if "windows" in sys:
return "windows"
return "linux"
async def get_dimensions(self) -> tuple[int, int]:
try:
res = await automation_handler.get_screen_size()
size = res.get("size") or {}
return int(size.get("width", 0)), int(size.get("height", 0))
except Exception:
return (0, 0)
size = await self._auto.get_screen_size()
return size["width"], size["height"]
async def screenshot(self) -> str:
res = await automation_handler.screenshot()
if not res.get("success"):
raise RuntimeError(res.get("error", "screenshot failed"))
return res.get("image_data", "")
img_b64 = await self._auto.screenshot()
return img_b64["image_data"]
async def click(self, x: int, y: int, button: str = "left") -> None:
if button == "right":
await automation_handler.right_click(x, y)
if button == "left":
await self._auto.left_click(x, y)
elif button == "right":
await self._auto.right_click(x, y)
else:
await automation_handler.left_click(x, y)
await self._auto.left_click(x, y)
async def double_click(self, x: int, y: int) -> None:
await automation_handler.double_click(x, y)
await self._auto.double_click(x, y)
async def scroll(self, x: int, y: int, scroll_x: int, scroll_y: int) -> None:
try:
if scroll_y:
await automation_handler.scroll(scroll_y, 0)
if scroll_x:
await automation_handler.scroll(0, scroll_x)
except Exception:
await automation_handler.scroll(scroll_y or 0, scroll_x or 0)
await self._auto.move_cursor(x, y)
await self._auto.scroll(scroll_x, scroll_y)
async def type(self, text: str) -> None:
await automation_handler.type_text(text)
await self._auto.type_text(text)
async def wait(self, ms: int = 1000) -> None:
await asyncio.sleep(ms / 1000.0)
async def move(self, x: int, y: int) -> None:
await automation_handler.move_cursor(x, y)
await self._auto.move_cursor(x, y)
async def keypress(self, keys: Union[List[str], str]) -> None:
if isinstance(keys, list):
if len(keys) <= 1:
key = keys[0] if keys else ""
if key:
await automation_handler.press_key(key)
else:
await cast(Any, automation_handler).hotkey([str(k) for k in keys])
elif isinstance(keys, str):
await automation_handler.press_key(keys)
if isinstance(keys, str):
parts = keys.replace("-", "+").split("+")
else:
parts = keys
if len(parts) == 1:
await self._auto.press_key(parts[0])
else:
await self._auto.hotkey(*parts)
async def drag(self, path: List[Dict[str, int]]) -> None:
await automation_handler.drag([(p["x"], p["y"]) for p in path])
if not path:
return
start = path[0]
await self._auto.mouse_down(start["x"], start["y"])
for pt in path[1:]:
await self._auto.move_cursor(pt["x"], pt["y"])
end = path[-1]
await self._auto.mouse_up(end["x"], end["y"])
async def get_current_url(self) -> str:
# Not available in this server context
return ""
async def left_mouse_down(self, x: Optional[int] = None, y: Optional[int] = None) -> None:
await automation_handler.mouse_down(x, y, button="left")
await self._auto.mouse_down(x, y, button="left")
async def left_mouse_up(self, x: Optional[int] = None, y: Optional[int] = None) -> None:
await automation_handler.mouse_up(x, y, button="left")
await self._auto.mouse_up(x, y, button="left")
custom_handler = DirectComputerHandler()
# # Inline image URLs to base64
# import base64, mimetypes, requests
# # Use a browser-like User-Agent to avoid 403s from some CDNs (e.g., Wikimedia)
# HEADERS = {
# "User-Agent": (
# "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
# "AppleWebKit/537.36 (KHTML, like Gecko) "
# "Chrome/124.0.0.0 Safari/537.36"
# )
# }
# def _to_data_url(content_bytes: bytes, url: str, resp: requests.Response) -> str:
# ctype = resp.headers.get("Content-Type") or mimetypes.guess_type(url)[0] or "application/octet-stream"
# b64 = base64.b64encode(content_bytes).decode("utf-8")
# return f"data:{ctype};base64,{b64}"
# def inline_image_urls(messages):
# # messages: List[{"role": "...","content":[...]}]
# out = []
# for m in messages:
# if not isinstance(m.get("content"), list):
# out.append(m)
# continue
# new_content = []
# for part in (m.get("content") or []):
# if part.get("type") == "input_image" and (url := part.get("image_url")):
# resp = requests.get(url, headers=HEADERS, timeout=30)
# resp.raise_for_status()
# new_content.append({
# "type": "input_image",
# "image_url": _to_data_url(resp.content, url, resp)
# })
# else:
# new_content.append(part)
# out.append({**m, "content": new_content})
# return out
# messages = inline_image_urls(messages)
# Prepare request for ResponsesHandler and force our tool
rh = ResponsesHandler()
request_payload: Dict[str, Any] = {
with _EnvOverride(env_overrides):
# Prepare tools: if caller did not pass tools, inject our DirectComputer
tools = agent_kwargs.get("tools")
if not tools:
tools = [DirectComputer()]
agent_kwargs = {**agent_kwargs, "tools": tools}
# Instantiate agent with our tools
agent = ComputerAgent(model=model, **agent_kwargs) # type: ignore[arg-type]
total_output: List[Any] = []
total_usage: Dict[str, Any] = {}
turns = 0
async for result in agent.run(messages):
total_output += result["output"]
# Try to collect usage if present
if isinstance(result, dict) and "usage" in result and isinstance(result["usage"], dict):
# Merge usage counters
for k, v in result["usage"].items():
if isinstance(v, (int, float)):
total_usage[k] = total_usage.get(k, 0) + v
else:
total_usage[k] = v
turns += 1
if turns > 2:
break
return {
"success": True,
"model": model,
"input": input_data,
"agent_kwargs": {**agent_kwargs, "tools": [custom_handler]},
# Don't need computer_kwargs; agent will use our tool instead
"env": env_overrides,
"output": total_output,
"usage": total_usage,
}
result = await rh.process_request(request_payload)
return result
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)