From d8b1792cc57e28071d932237e6608405b71192b0 Mon Sep 17 00:00:00 2001 From: Dillon DuPont Date: Mon, 11 Aug 2025 15:24:56 -0400 Subject: [PATCH] Added human/ provider to liteLLM --- .../agent/agent/adapters/human_adapter.py | 348 ++++++++++ libs/python/agent/agent/agent.py | 9 +- .../python/agent/agent/human_tool/__init__.py | 29 + .../python/agent/agent/human_tool/__main__.py | 38 ++ libs/python/agent/agent/human_tool/server.py | 234 +++++++ libs/python/agent/agent/human_tool/ui.py | 630 ++++++++++++++++++ 6 files changed, 1286 insertions(+), 2 deletions(-) create mode 100644 libs/python/agent/agent/adapters/human_adapter.py create mode 100644 libs/python/agent/agent/human_tool/__init__.py create mode 100644 libs/python/agent/agent/human_tool/__main__.py create mode 100644 libs/python/agent/agent/human_tool/server.py create mode 100644 libs/python/agent/agent/human_tool/ui.py diff --git a/libs/python/agent/agent/adapters/human_adapter.py b/libs/python/agent/agent/adapters/human_adapter.py new file mode 100644 index 00000000..0cd4fe02 --- /dev/null +++ b/libs/python/agent/agent/adapters/human_adapter.py @@ -0,0 +1,348 @@ +import os +import asyncio +import requests +from typing import List, Dict, Any, Iterator, AsyncIterator +from litellm.types.utils import GenericStreamingChunk, ModelResponse +from litellm.llms.custom_llm import CustomLLM +from litellm import completion, acompletion + + +class HumanAdapter(CustomLLM): + """Human Adapter for human-in-the-loop completions. + + This adapter sends completion requests to a human completion server + where humans can review and respond to AI requests. + """ + + def __init__(self, base_url: str | None = None, timeout: float = 300.0, **kwargs): + """Initialize the human adapter. + + Args: + base_url: Base URL for the human completion server. + Defaults to HUMAN_BASE_URL environment variable or http://localhost:8002 + timeout: Timeout in seconds for waiting for human response + **kwargs: Additional arguments + """ + super().__init__() + self.base_url = base_url or os.getenv('HUMAN_BASE_URL', 'http://localhost:8002') + self.timeout = timeout + + # Ensure base_url doesn't end with slash + self.base_url = self.base_url.rstrip('/') + + def _queue_completion(self, messages: List[Dict[str, Any]], model: str) -> str: + """Queue a completion request and return the call ID. + + Args: + messages: Messages in OpenAI format + model: Model name + + Returns: + Call ID for tracking the request + + Raises: + Exception: If queueing fails + """ + try: + response = requests.post( + f"{self.base_url}/queue", + json={"messages": messages, "model": model}, + timeout=10 + ) + response.raise_for_status() + return response.json()["id"] + except requests.RequestException as e: + raise Exception(f"Failed to queue completion request: {e}") + + def _wait_for_completion(self, call_id: str) -> Dict[str, Any]: + """Wait for human to complete the call. + + Args: + call_id: ID of the queued completion call + + Returns: + Dict containing response and/or tool_calls + + Raises: + TimeoutError: If timeout is exceeded + Exception: If completion fails + """ + import time + + start_time = time.time() + + while True: + try: + # Check status + status_response = requests.get(f"{self.base_url}/status/{call_id}") + status_response.raise_for_status() + status_data = status_response.json() + + if status_data["status"] == "completed": + result = {} + if "response" in status_data and status_data["response"]: + result["response"] = status_data["response"] + if "tool_calls" in status_data and status_data["tool_calls"]: + result["tool_calls"] = status_data["tool_calls"] + return result + elif status_data["status"] == "failed": + error_msg = status_data.get("error", "Unknown error") + raise Exception(f"Completion failed: {error_msg}") + + # Check timeout + if time.time() - start_time > self.timeout: + raise TimeoutError(f"Timeout waiting for human response after {self.timeout} seconds") + + # Wait before checking again + time.sleep(1.0) + + except requests.RequestException as e: + if time.time() - start_time > self.timeout: + raise TimeoutError(f"Timeout waiting for human response: {e}") + # Continue trying if we haven't timed out + time.sleep(1.0) + + async def _async_wait_for_completion(self, call_id: str) -> Dict[str, Any]: + """Async version of wait_for_completion. + + Args: + call_id: ID of the queued completion call + + Returns: + Dict containing response and/or tool_calls + + Raises: + TimeoutError: If timeout is exceeded + Exception: If completion fails + """ + import aiohttp + import time + + start_time = time.time() + + async with aiohttp.ClientSession() as session: + while True: + try: + # Check status + async with session.get(f"{self.base_url}/status/{call_id}") as response: + response.raise_for_status() + status_data = await response.json() + + if status_data["status"] == "completed": + result = {} + if "response" in status_data and status_data["response"]: + result["response"] = status_data["response"] + if "tool_calls" in status_data and status_data["tool_calls"]: + result["tool_calls"] = status_data["tool_calls"] + return result + elif status_data["status"] == "failed": + error_msg = status_data.get("error", "Unknown error") + raise Exception(f"Completion failed: {error_msg}") + + # Check timeout + if time.time() - start_time > self.timeout: + raise TimeoutError(f"Timeout waiting for human response after {self.timeout} seconds") + + # Wait before checking again + await asyncio.sleep(1.0) + + except Exception as e: + if time.time() - start_time > self.timeout: + raise TimeoutError(f"Timeout waiting for human response: {e}") + # Continue trying if we haven't timed out + await asyncio.sleep(1.0) + + def _generate_response(self, messages: List[Dict[str, Any]], model: str) -> Dict[str, Any]: + """Generate a human response for the given messages. + + Args: + messages: Messages in OpenAI format + model: Model name + + Returns: + Dict containing response and/or tool_calls + """ + # Queue the completion request + call_id = self._queue_completion(messages, model) + + # Wait for human response + response = self._wait_for_completion(call_id) + + return response + + async def _async_generate_response(self, messages: List[Dict[str, Any]], model: str) -> Dict[str, Any]: + """Async version of _generate_response. + + Args: + messages: Messages in OpenAI format + model: Model name + + Returns: + Dict containing response and/or tool_calls + """ + # Queue the completion request (sync operation) + call_id = self._queue_completion(messages, model) + + # Wait for human response (async) + response = await self._async_wait_for_completion(call_id) + + return response + + def completion(self, *args, **kwargs) -> ModelResponse: + """Synchronous completion method. + + Returns: + ModelResponse with human-generated text or tool calls + """ + messages = kwargs.get('messages', []) + model = kwargs.get('model', 'human') + + # Generate human response + human_response_data = self._generate_response(messages, model) + + # Create ModelResponse with proper structure + from litellm.types.utils import ModelResponse, Choices, Message + import uuid + import time + + # Create message content based on response type + if "tool_calls" in human_response_data and human_response_data["tool_calls"]: + # Tool calls response + message = Message( + role="assistant", + content=human_response_data.get("response", ""), + tool_calls=human_response_data["tool_calls"] + ) + else: + # Text response + message = Message( + role="assistant", + content=human_response_data.get("response", "") + ) + + choice = Choices( + finish_reason="stop", + index=0, + message=message + ) + + result = ModelResponse( + id=f"human-{uuid.uuid4()}", + choices=[choice], + created=int(time.time()), + model=f"human/{model}", + object="chat.completion" + ) + + return result + + async def acompletion(self, *args, **kwargs) -> ModelResponse: + """Asynchronous completion method. + + Returns: + ModelResponse with human-generated text or tool calls + """ + messages = kwargs.get('messages', []) + model = kwargs.get('model', 'human') + + # Generate human response + human_response_data = await self._async_generate_response(messages, model) + + # Create ModelResponse with proper structure + from litellm.types.utils import ModelResponse, Choices, Message + import uuid + import time + + # Create message content based on response type + if "tool_calls" in human_response_data and human_response_data["tool_calls"]: + # Tool calls response + message = Message( + role="assistant", + content=human_response_data.get("response", ""), + tool_calls=human_response_data["tool_calls"] + ) + else: + # Text response + message = Message( + role="assistant", + content=human_response_data.get("response", "") + ) + + choice = Choices( + finish_reason="stop", + index=0, + message=message + ) + + result = ModelResponse( + id=f"human-{uuid.uuid4()}", + choices=[choice], + created=int(time.time()), + model=f"human/{model}", + object="chat.completion" + ) + + return result + + def streaming(self, *args, **kwargs) -> Iterator[GenericStreamingChunk]: + """Synchronous streaming method. + + Yields: + Streaming chunks with human-generated text or tool calls + """ + messages = kwargs.get('messages', []) + model = kwargs.get('model', 'human') + + # Generate human response + human_response_data = self._generate_response(messages, model) + + import time + + # Handle tool calls vs text response + if "tool_calls" in human_response_data and human_response_data["tool_calls"]: + # Stream tool calls as a single chunk + generic_chunk: GenericStreamingChunk = { + "finish_reason": "tool_calls", + "index": 0, + "is_finished": True, + "text": human_response_data.get("response", ""), + "tool_use": human_response_data["tool_calls"], + "usage": {"completion_tokens": 1, "prompt_tokens": 0, "total_tokens": 1}, + } + yield generic_chunk + else: + # Stream text response + response_text = human_response_data.get("response", "") + generic_chunk: GenericStreamingChunk = { + "finish_reason": "stop", + "index": 0, + "is_finished": True, + "text": response_text, + "tool_use": None, + "usage": {"completion_tokens": len(response_text.split()), "prompt_tokens": 0, "total_tokens": len(response_text.split())}, + } + yield generic_chunk + + async def astreaming(self, *args, **kwargs) -> AsyncIterator[GenericStreamingChunk]: + """Asynchronous streaming method. + + Yields: + Streaming chunks with human-generated text or tool calls + """ + messages = kwargs.get('messages', []) + model = kwargs.get('model', 'human') + + # Generate human response + human_response = await self._async_generate_response(messages, model) + + # Return as single streaming chunk + generic_streaming_chunk: GenericStreamingChunk = { + "finish_reason": "stop", + "index": 0, + "is_finished": True, + "text": human_response, + "tool_use": None, + "usage": {"completion_tokens": len(human_response.split()), "prompt_tokens": 0, "total_tokens": len(human_response.split())}, + } + + yield generic_streaming_chunk \ No newline at end of file diff --git a/libs/python/agent/agent/agent.py b/libs/python/agent/agent/agent.py index 14bd92aa..7f30166f 100644 --- a/libs/python/agent/agent/agent.py +++ b/libs/python/agent/agent/agent.py @@ -13,7 +13,10 @@ import json import litellm import litellm.utils import inspect -from .adapters import HuggingFaceLocalAdapter +from .adapters import ( + HuggingFaceLocalAdapter, + HumanAdapter, +) from .callbacks import ( ImageRetentionCallback, LoggingCallback, @@ -215,8 +218,10 @@ class ComputerAgent: hf_adapter = HuggingFaceLocalAdapter( device="auto" ) + human_adapter = HumanAdapter() litellm.custom_provider_map = [ - {"provider": "huggingface-local", "custom_handler": hf_adapter} + {"provider": "huggingface-local", "custom_handler": hf_adapter}, + {"provider": "human", "custom_handler": human_adapter} ] litellm.suppress_debug_info = True diff --git a/libs/python/agent/agent/human_tool/__init__.py b/libs/python/agent/agent/human_tool/__init__.py new file mode 100644 index 00000000..f57fb305 --- /dev/null +++ b/libs/python/agent/agent/human_tool/__init__.py @@ -0,0 +1,29 @@ +""" +Human-in-the-Loop Completion Tool + +This package provides a human-in-the-loop completion system that allows +AI agents to request human assistance for complex decisions or responses. + +Components: +- server.py: FastAPI server with completion queue management +- ui.py: Gradio UI for human interaction +- __main__.py: Combined server and UI application + +Usage: + # Run the server and UI + python -m agent.human_tool + + # Or run components separately + python -m agent.human_tool.server # API server only + python -m agent.human_tool.ui # UI only +""" + +from .server import CompletionQueue, completion_queue +from .ui import HumanCompletionUI, create_ui + +__all__ = [ + "CompletionQueue", + "completion_queue", + "HumanCompletionUI", + "create_ui" +] diff --git a/libs/python/agent/agent/human_tool/__main__.py b/libs/python/agent/agent/human_tool/__main__.py new file mode 100644 index 00000000..e1ceed50 --- /dev/null +++ b/libs/python/agent/agent/human_tool/__main__.py @@ -0,0 +1,38 @@ +#!/usr/bin/env python3 +""" +Human-in-the-Loop Completion Server and UI + +This module combines the FastAPI server for handling completion requests +with a Gradio UI for human interaction. +""" + +import gradio as gr +from fastapi import FastAPI +from .server import app as fastapi_app +from .ui import create_ui + +# Create the Gradio demo +gradio_demo = create_ui() + +# Mount Gradio on FastAPI +CUSTOM_PATH = "/gradio" +app = gr.mount_gradio_app(fastapi_app, gradio_demo, path=CUSTOM_PATH) + +# Add a redirect from root to Gradio UI +@fastapi_app.get("/") +async def redirect_to_ui(): + """Redirect root to Gradio UI.""" + return { + "message": "Human Completion Server is running", + "ui_url": "/gradio", + "api_docs": "/docs" + } + +if __name__ == "__main__": + import uvicorn + print("🚀 Starting Human-in-the-Loop Completion Server...") + print("📊 API Server: http://localhost:8002") + print("🎨 Gradio UI: http://localhost:8002/gradio") + print("📚 API Docs: http://localhost:8002/docs") + + uvicorn.run(app, host="0.0.0.0", port=8002) diff --git a/libs/python/agent/agent/human_tool/server.py b/libs/python/agent/agent/human_tool/server.py new file mode 100644 index 00000000..c5d08cfe --- /dev/null +++ b/libs/python/agent/agent/human_tool/server.py @@ -0,0 +1,234 @@ +import asyncio +import uuid +from datetime import datetime +from typing import Dict, List, Any, Optional +from dataclasses import dataclass, asdict +from enum import Enum + +from fastapi import FastAPI, HTTPException +from pydantic import BaseModel + + +class CompletionStatus(str, Enum): + PENDING = "pending" + COMPLETED = "completed" + FAILED = "failed" + + +@dataclass +class CompletionCall: + id: str + messages: List[Dict[str, Any]] + model: str + status: CompletionStatus + created_at: datetime + completed_at: Optional[datetime] = None + response: Optional[str] = None + tool_calls: Optional[List[Dict[str, Any]]] = None + error: Optional[str] = None + + +class ToolCall(BaseModel): + id: str + type: str = "function" + function: Dict[str, Any] + + +class CompletionRequest(BaseModel): + messages: List[Dict[str, Any]] + model: str + + +class CompletionResponse(BaseModel): + response: Optional[str] = None + tool_calls: Optional[List[Dict[str, Any]]] = None + + +class CompletionQueue: + def __init__(self): + self._queue: Dict[str, CompletionCall] = {} + self._pending_order: List[str] = [] + self._lock = asyncio.Lock() + + async def add_completion(self, messages: List[Dict[str, Any]], model: str) -> str: + """Add a completion call to the queue.""" + async with self._lock: + call_id = str(uuid.uuid4()) + completion_call = CompletionCall( + id=call_id, + messages=messages, + model=model, + status=CompletionStatus.PENDING, + created_at=datetime.now() + ) + self._queue[call_id] = completion_call + self._pending_order.append(call_id) + return call_id + + async def get_pending_calls(self) -> List[Dict[str, Any]]: + """Get all pending completion calls.""" + async with self._lock: + pending_calls = [] + for call_id in self._pending_order: + if call_id in self._queue and self._queue[call_id].status == CompletionStatus.PENDING: + call = self._queue[call_id] + pending_calls.append({ + "id": call.id, + "model": call.model, + "created_at": call.created_at.isoformat(), + "messages": call.messages + }) + return pending_calls + + async def get_call_status(self, call_id: str) -> Optional[Dict[str, Any]]: + """Get the status of a specific completion call.""" + async with self._lock: + if call_id not in self._queue: + return None + + call = self._queue[call_id] + result = { + "id": call.id, + "status": call.status.value, + "created_at": call.created_at.isoformat(), + "model": call.model, + "messages": call.messages + } + + if call.completed_at: + result["completed_at"] = call.completed_at.isoformat() + if call.response: + result["response"] = call.response + if call.tool_calls: + result["tool_calls"] = call.tool_calls + if call.error: + result["error"] = call.error + + return result + + async def complete_call(self, call_id: str, response: Optional[str] = None, tool_calls: Optional[List[Dict[str, Any]]] = None) -> bool: + """Mark a completion call as completed with a response or tool calls.""" + async with self._lock: + if call_id not in self._queue: + return False + + call = self._queue[call_id] + if call.status != CompletionStatus.PENDING: + return False + + call.status = CompletionStatus.COMPLETED + call.completed_at = datetime.now() + call.response = response + call.tool_calls = tool_calls + + # Remove from pending order + if call_id in self._pending_order: + self._pending_order.remove(call_id) + + return True + + async def fail_call(self, call_id: str, error: str) -> bool: + """Mark a completion call as failed with an error.""" + async with self._lock: + if call_id not in self._queue: + return False + + call = self._queue[call_id] + if call.status != CompletionStatus.PENDING: + return False + + call.status = CompletionStatus.FAILED + call.completed_at = datetime.now() + call.error = error + + # Remove from pending order + if call_id in self._pending_order: + self._pending_order.remove(call_id) + + return True + + async def wait_for_completion(self, call_id: str, timeout: float = 300.0) -> Optional[str]: + """Wait for a completion call to be completed and return the response.""" + start_time = asyncio.get_event_loop().time() + + while True: + status = await self.get_call_status(call_id) + if not status: + return None + + if status["status"] == CompletionStatus.COMPLETED.value: + return status.get("response") + elif status["status"] == CompletionStatus.FAILED.value: + raise Exception(f"Completion failed: {status.get('error', 'Unknown error')}") + + # Check timeout + if asyncio.get_event_loop().time() - start_time > timeout: + await self.fail_call(call_id, "Timeout waiting for human response") + raise TimeoutError("Timeout waiting for human response") + + # Wait a bit before checking again + await asyncio.sleep(0.5) + + +# Global queue instance +completion_queue = CompletionQueue() + +# FastAPI app +app = FastAPI(title="Human Completion Server", version="1.0.0") + + +@app.post("/queue", response_model=Dict[str, str]) +async def queue_completion(request: CompletionRequest): + """Add a completion request to the queue.""" + call_id = await completion_queue.add_completion(request.messages, request.model) + return {"id": call_id, "status": "queued"} + + +@app.get("/pending") +async def list_pending(): + """List all pending completion calls.""" + pending_calls = await completion_queue.get_pending_calls() + return {"pending_calls": pending_calls} + + +@app.get("/status/{call_id}") +async def get_status(call_id: str): + """Get the status of a specific completion call.""" + status = await completion_queue.get_call_status(call_id) + if not status: + raise HTTPException(status_code=404, detail="Completion call not found") + return status + + +@app.post("/complete/{call_id}") +async def complete_call(call_id: str, response: CompletionResponse): + """Complete a call with a human response.""" + success = await completion_queue.complete_call( + call_id, + response=response.response, + tool_calls=response.tool_calls + ) + if success: + return {"status": "success", "message": "Call completed"} + else: + raise HTTPException(status_code=404, detail="Call not found or already completed") + + +@app.post("/fail/{call_id}") +async def fail_call(call_id: str, error: Dict[str, str]): + """Mark a call as failed.""" + success = await completion_queue.fail_call(call_id, error.get("error", "Unknown error")) + if not success: + raise HTTPException(status_code=404, detail="Completion call not found or already completed") + return {"status": "failed"} + + +@app.get("/") +async def root(): + """Root endpoint.""" + return {"message": "Human Completion Server is running"} + + +if __name__ == "__main__": + import uvicorn + uvicorn.run(app, host="0.0.0.0", port=8002) diff --git a/libs/python/agent/agent/human_tool/ui.py b/libs/python/agent/agent/human_tool/ui.py new file mode 100644 index 00000000..f4a9fb4f --- /dev/null +++ b/libs/python/agent/agent/human_tool/ui.py @@ -0,0 +1,630 @@ +import gradio as gr +import json +import time +from typing import List, Dict, Any, Optional +from datetime import datetime +import requests +from .server import completion_queue +import base64 +import io +from PIL import Image + +class HumanCompletionUI: + def __init__(self, server_url: str = "http://localhost:8002"): + self.server_url = server_url + self.current_call_id: Optional[str] = None + self.refresh_interval = 2.0 # seconds + self.last_image = None # Store the last image for display + + def format_messages_for_chatbot(self, messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + """Format messages for display in gr.Chatbot with type='messages'.""" + formatted = [] + for msg in messages: + role = msg.get("role", "user") + content = msg.get("content", "") + tool_calls = msg.get("tool_calls", []) + + # Handle different content formats + if isinstance(content, list): + # Multi-modal content - can include text and images + formatted_content = [] + for item in content: + if item.get("type") == "text": + text = item.get("text", "") + if text.strip(): # Only add non-empty text + formatted_content.append(text) + elif item.get("type") == "image_url": + image_url = item.get("image_url", {}).get("url", "") + if image_url: + # Check if it's a base64 image or URL + if image_url.startswith("data:image"): + # For base64 images, decode and create gr.Image + try: + header, data = image_url.split(",", 1) + image_data = base64.b64decode(data) + image = Image.open(io.BytesIO(image_data)) + formatted_content.append(gr.Image(value=image)) + except Exception as e: + print(f"Error loading image: {e}") + formatted_content.append(f"[Image loading error: {e}]") + else: + # For URL images, create gr.Image with URL + formatted_content.append(gr.Image(value=image_url)) + + # Determine final content format + if len(formatted_content) == 1: + content = formatted_content[0] + elif len(formatted_content) > 1: + content = formatted_content + else: + content = "[Empty content]" + + # Ensure role is valid for Gradio Chatbot + if role not in ["user", "assistant"]: + role = "assistant" if role == "system" else "user" + + # Invert roles for better display in human UI context + # (what the AI says becomes "user", what human should respond becomes "assistant") + if role == "user": + role = "assistant" + else: + role = "user" + + # Add the main message if it has content + if content and str(content).strip(): + formatted.append({"role": role, "content": content}) + + # Handle tool calls - create separate messages for each tool call + if tool_calls: + for tool_call in tool_calls: + function_name = tool_call.get("function", {}).get("name", "unknown") + arguments_str = tool_call.get("function", {}).get("arguments", "{}") + + try: + # Parse arguments to format them nicely + arguments = json.loads(arguments_str) + formatted_args = json.dumps(arguments, indent=2) + except json.JSONDecodeError: + # If parsing fails, use the raw string + formatted_args = arguments_str + + # Create a formatted message for the tool call + tool_call_content = f"```json\n{formatted_args}\n```" + + formatted.append({ + "role": role, + "content": tool_call_content, + "metadata": {"title": f"🛠️ Used {function_name}"} + }) + + return formatted + + def get_pending_calls(self) -> List[Dict[str, Any]]: + """Get pending calls from the server.""" + try: + response = requests.get(f"{self.server_url}/pending", timeout=5) + if response.status_code == 200: + return response.json().get("pending_calls", []) + except Exception as e: + print(f"Error fetching pending calls: {e}") + return [] + + def complete_call_with_response(self, call_id: str, response: str) -> bool: + """Complete a call with a text response.""" + try: + response_data = {"response": response} + response_obj = requests.post( + f"{self.server_url}/complete/{call_id}", + json=response_data, + timeout=10 + ) + response_obj.raise_for_status() + return True + except requests.RequestException as e: + print(f"Error completing call: {e}") + return False + + def complete_call_with_tool_calls(self, call_id: str, tool_calls: List[Dict[str, Any]]) -> bool: + """Complete a call with tool calls.""" + try: + response_data = {"tool_calls": tool_calls} + response_obj = requests.post( + f"{self.server_url}/complete/{call_id}", + json=response_data, + timeout=10 + ) + response_obj.raise_for_status() + return True + except requests.RequestException as e: + print(f"Error completing call: {e}") + return False + + def complete_call(self, call_id: str, response: Optional[str] = None, tool_calls: Optional[List[Dict[str, Any]]] = None) -> bool: + """Complete a call with either a response or tool calls.""" + try: + response_data = {} + if response: + response_data["response"] = response + if tool_calls: + response_data["tool_calls"] = tool_calls + + response_obj = requests.post( + f"{self.server_url}/complete/{call_id}", + json=response_data, + timeout=10 + ) + response_obj.raise_for_status() + return True + except requests.RequestException as e: + print(f"Error completing call: {e}") + return False + + def get_last_image_from_messages(self, messages: List[Dict[str, Any]]) -> Optional[Any]: + """Extract the last image from the messages for display above conversation.""" + last_image = None + + for msg in reversed(messages): # Start from the last message + content = msg.get("content", "") + + if isinstance(content, list): + for item in reversed(content): # Get the last image in the message + if item.get("type") == "image_url": + image_url = item.get("image_url", {}).get("url", "") + if image_url: + if image_url.startswith("data:image"): + # For base64 images, create a gr.Image component + try: + header, data = image_url.split(",", 1) + image_data = base64.b64decode(data) + image = Image.open(io.BytesIO(image_data)) + return image + except Exception as e: + print(f"Error loading image: {e}") + continue + else: + # For URL images, return the URL + return image_url + + return last_image + + def refresh_pending_calls(self): + """Refresh the list of pending calls.""" + pending_calls = self.get_pending_calls() + + if not pending_calls: + return ( + gr.update(choices=["latest"], value="latest"), # dropdown + gr.update(value=None), # image (no image) + gr.update(value=[]), # chatbot (empty messages) + gr.update(interactive=False) # submit button + ) + + # Sort pending calls by created_at to get oldest first + sorted_calls = sorted(pending_calls, key=lambda x: x.get("created_at", "")) + + # Create choices for dropdown + choices = [("latest", "latest")] # Add "latest" option first + + for call in sorted_calls: + call_id = call["id"] + model = call.get("model", "unknown") + created_at = call.get("created_at", "") + # Format timestamp + try: + dt = datetime.fromisoformat(created_at.replace('Z', '+00:00')) + time_str = dt.strftime("%H:%M:%S") + except: + time_str = created_at + + choice_label = f"{call_id[:8]}... ({model}) - {time_str}" + choices.append((choice_label, call_id)) + + # Default to "latest" which shows the oldest pending conversation + selected_call_id = "latest" + if selected_call_id == "latest" and sorted_calls: + # Use the oldest call (first in sorted list) + selected_call = sorted_calls[0] + conversation = self.format_messages_for_chatbot(selected_call.get("messages", [])) + self.current_call_id = selected_call["id"] + # Get the last image from messages + self.last_image = self.get_last_image_from_messages(selected_call.get("messages", [])) + else: + conversation = [] + self.current_call_id = None + self.last_image = None + + return ( + gr.update(choices=choices, value="latest"), + gr.update(value=self.last_image), + gr.update(value=conversation), + gr.update(interactive=bool(choices)) + ) + + def on_call_selected(self, selected_choice): + """Handle when a call is selected from the dropdown.""" + if not selected_choice: + return ( + gr.update(value=None), # no image + gr.update(value=[]), # empty chatbot + gr.update(interactive=False) + ) + + pending_calls = self.get_pending_calls() + if not pending_calls: + return ( + gr.update(value=None), # no image + gr.update(value=[]), # empty chatbot + gr.update(interactive=False) + ) + + # Handle "latest" option + if selected_choice == "latest": + # Sort calls by created_at to get oldest first + sorted_calls = sorted(pending_calls, key=lambda x: x.get("created_at", "")) + selected_call = sorted_calls[0] # Get the oldest call + call_id = selected_call["id"] + else: + # Extract call_id from the choice for specific calls + call_id = None + for call in pending_calls: + call_id_short = call["id"][:8] + if call_id_short in selected_choice: + call_id = call["id"] + break + + if not call_id: + return ( + gr.update(value=None), # no image + gr.update(value=[]), # empty chatbot + gr.update(interactive=False) + ) + + # Find the selected call + selected_call = next((c for c in pending_calls if c["id"] == call_id), None) + + if not selected_call: + return ( + gr.update(value=None), # no image + gr.update(value=[]), # empty chatbot + gr.update(interactive=False) + ) + + conversation = self.format_messages_for_chatbot(selected_call.get("messages", [])) + self.current_call_id = call_id + # Get the last image from messages + self.last_image = self.get_last_image_from_messages(selected_call.get("messages", [])) + + return ( + gr.update(value=self.last_image), + gr.update(value=conversation), + gr.update(interactive=True) + ) + + def submit_response(self, response_text: str): + """Submit a text response to the current call.""" + if not self.current_call_id: + return ( + gr.update(value=response_text), # keep response text + gr.update(value="❌ No call selected") # status + ) + + if not response_text.strip(): + return ( + gr.update(value=response_text), # keep response text + gr.update(value="❌ Response cannot be empty") # status + ) + + success = self.complete_call_with_response(self.current_call_id, response_text) + + if success: + status_msg = "✅ Response submitted successfully!" + return ( + gr.update(value=""), # clear response text + gr.update(value=status_msg) # status + ) + else: + return ( + gr.update(value=response_text), # keep response text + gr.update(value="❌ Failed to submit response") # status + ) + + def submit_action(self, action_type: str, **kwargs) -> str: + """Submit a computer action as a tool call.""" + if not self.current_call_id: + return "❌ No call selected" + + import uuid + + # Create tool call structure + action_data = {"type": action_type, **kwargs} + tool_call = { + "id": f"call_{uuid.uuid4().hex[:24]}", + "type": "function", + "function": { + "name": "computer", + "arguments": json.dumps(action_data) + } + } + + success = self.complete_call_with_tool_calls(self.current_call_id, [tool_call]) + + if success: + return f"✅ {action_type.capitalize()} action submitted as tool call" + else: + return f"❌ Failed to submit {action_type} action" + + def submit_click_action(self, x: int, y: int, action_type: str = "click", button: str = "left") -> str: + """Submit a coordinate-based action.""" + if action_type == "click": + return self.submit_action(action_type, x=x, y=y, button=button) + else: + return self.submit_action(action_type, x=x, y=y) + + def submit_type_action(self, text: str) -> str: + """Submit a type action.""" + return self.submit_action("type", text=text) + + def submit_hotkey_action(self, keys: str) -> str: + """Submit a hotkey action.""" + return self.submit_action("keypress", keys=keys) + + def submit_description_click(self, description: str, action_type: str = "click", button: str = "left") -> str: + """Submit a description-based action.""" + if action_type == "click": + return self.submit_action(action_type, element_description=description, button=button) + else: + return self.submit_action(action_type, element_description=description) + + def wait_for_pending_calls(self, max_seconds: float = 10.0, check_interval: float = 0.2): + """Wait for pending calls to appear or until max_seconds elapsed. + + This method loops and checks for pending calls at regular intervals, + returning as soon as a pending call is found or the maximum wait time is reached. + + Args: + max_seconds: Maximum number of seconds to wait + check_interval: How often to check for pending calls (in seconds) + """ + import time + + start_time = time.time() + + while time.time() - start_time < max_seconds: + # Check if there are any pending calls + pending_calls = self.get_pending_calls() + if pending_calls: + # Found pending calls, return immediately + return self.refresh_pending_calls() + + # Wait before checking again + time.sleep(check_interval) + + # Max wait time reached, return current state + return self.refresh_pending_calls() + + +def create_ui(): + """Create the Gradio interface.""" + ui_handler = HumanCompletionUI() + + with gr.Blocks(title="Human-in-the-Loop Agent Tool") as demo: + gr.Markdown("# 🤖 Human-in-the-Loop Agent Tool") + gr.Markdown("Review AI conversation requests and provide human responses.") + + with gr.Row(): + with gr.Column(scale=2): + with gr.Group(): + screenshot_image = gr.Image( + label="Screenshot", + interactive=False, + height=600 + ) + + # Action type selection for image clicks + with gr.Row(): + action_type_radio = gr.Radio( + label="Action Type", + choices=["click", "double_click", "move", "left_mouse_up", "left_mouse_down"], + value="click", + scale=2 + ) + action_button_radio = gr.Radio( + label="Button (for click only)", + choices=["left", "right", "wheel", "back", "forward"], + value="left", + visible=True, + scale=1 + ) + + conversation_chatbot = gr.Chatbot( + label="Messages", + type="messages", + height=500, + show_copy_button=True + ) + + with gr.Column(scale=1): + with gr.Group(): + call_dropdown = gr.Dropdown( + label="Select a pending call", + choices=["latest"], + interactive=True, + value="latest" + ) + refresh_btn = gr.Button("🔄 Refresh", variant="secondary") + + with gr.Group(): + response_text = gr.Textbox( + label="Response", + lines=3, + placeholder="Enter your response here..." + ) + submit_btn = gr.Button("📤 Submit Response", variant="primary", interactive=False) + + # Action Accordions + with gr.Accordion("🖱️ Click Actions", open=False): + with gr.Group(): + with gr.Row(): + click_x = gr.Number(label="X", value=0, minimum=0) + click_y = gr.Number(label="Y", value=0, minimum=0) + with gr.Row(): + click_action_type = gr.Dropdown( + label="Action Type", + choices=["click", "double_click", "move", "left_mouse_up", "left_mouse_down"], + value="click" + ) + click_button = gr.Dropdown( + label="Button (for click only)", + choices=["left", "right", "wheel", "back", "forward"], + value="left" + ) + click_submit_btn = gr.Button("Submit Action") + + with gr.Accordion("📝 Type Action", open=False): + with gr.Group(): + type_text = gr.Textbox( + label="Text to Type", + placeholder="Enter text to type..." + ) + type_submit_btn = gr.Button("Submit Type") + + with gr.Accordion("⌨️ Keypress Action", open=False): + with gr.Group(): + keypress_text = gr.Textbox( + label="Keys", + placeholder="e.g., ctrl+c, alt+tab" + ) + keypress_submit_btn = gr.Button("Submit Keypress") + + with gr.Accordion("🎯 Description Action", open=False): + with gr.Group(): + description_text = gr.Textbox( + label="Element Description", + placeholder="e.g., 'Privacy and security option in left sidebar'" + ) + with gr.Row(): + description_action_type = gr.Dropdown( + label="Action Type", + choices=["click", "double_click", "move", "left_mouse_up", "left_mouse_down"], + value="click" + ) + description_button = gr.Radio( + label="Button (for click only)", + choices=["left", "right", "wheel", "back", "forward"], + value="left" + ) + description_submit_btn = gr.Button("Submit Description Action") + + status_display = gr.Textbox( + label="Status", + interactive=False, + value="Ready to receive calls..." + ) + + # Event handlers + refresh_btn.click( + fn=ui_handler.refresh_pending_calls, + outputs=[call_dropdown, screenshot_image, conversation_chatbot, submit_btn] + ) + + call_dropdown.change( + fn=ui_handler.on_call_selected, + inputs=[call_dropdown], + outputs=[screenshot_image, conversation_chatbot, submit_btn] + ) + + def handle_image_click(evt: gr.SelectData): + if evt.index is not None: + x, y = evt.index + action_type = action_type_radio.value or "click" + button = action_button_radio.value or "left" + result = ui_handler.submit_click_action(x, y, action_type, button) + ui_handler.wait_for_pending_calls() + return result + return "No coordinates selected" + + screenshot_image.select( + fn=handle_image_click, + outputs=[status_display] + ).then( + fn=ui_handler.wait_for_pending_calls, + outputs=[call_dropdown, screenshot_image, conversation_chatbot, submit_btn] + ) + + # Response submission + submit_btn.click( + fn=ui_handler.submit_response, + inputs=[response_text], + outputs=[response_text, status_display] + ).then( + fn=ui_handler.refresh_pending_calls, + outputs=[call_dropdown, screenshot_image, conversation_chatbot, submit_btn] + ) + + # Toggle button radio visibility based on action type + def toggle_button_visibility(action_type): + return gr.update(visible=(action_type == "click")) + + action_type_radio.change( + fn=toggle_button_visibility, + inputs=[action_type_radio], + outputs=[action_button_radio] + ) + + # Action accordion handlers + click_submit_btn.click( + fn=ui_handler.submit_click_action, + inputs=[click_x, click_y, click_action_type, click_button], + outputs=[status_display] + ).then( + fn=ui_handler.wait_for_pending_calls, + outputs=[call_dropdown, screenshot_image, conversation_chatbot, submit_btn] + ) + + type_submit_btn.click( + fn=ui_handler.submit_type_action, + inputs=[type_text], + outputs=[status_display] + ).then( + fn=ui_handler.wait_for_pending_calls, + outputs=[call_dropdown, screenshot_image, conversation_chatbot, submit_btn] + ) + + keypress_submit_btn.click( + fn=ui_handler.submit_hotkey_action, + inputs=[keypress_text], + outputs=[status_display] + ).then( + fn=ui_handler.wait_for_pending_calls, + outputs=[call_dropdown, screenshot_image, conversation_chatbot, submit_btn] + ) + + def handle_description_submit(description, action_type, button): + if description: + result = ui_handler.submit_description_click(description, action_type, button) + ui_handler.wait_for_pending_calls() + return result + return "Please enter a description" + + description_submit_btn.click( + fn=handle_description_submit, + inputs=[description_text, description_action_type, description_button], + outputs=[status_display] + ).then( + fn=ui_handler.wait_for_pending_calls, + outputs=[call_dropdown, screenshot_image, conversation_chatbot, submit_btn] + ) + + # Load initial data + demo.load( + fn=ui_handler.refresh_pending_calls, + outputs=[call_dropdown, screenshot_image, conversation_chatbot, submit_btn] + ) + + return demo + + +if __name__ == "__main__": + demo = create_ui() + demo.queue() + demo.launch(server_name="0.0.0.0", server_port=7860)