diff --git a/libs/agent/agent/providers/openai/loop.py b/libs/agent/agent/providers/openai/loop.py index 87719d1b..e791b8c9 100644 --- a/libs/agent/agent/providers/openai/loop.py +++ b/libs/agent/agent/providers/openai/loop.py @@ -133,22 +133,22 @@ class OpenAILoop(BaseLoop): logger.info("Starting OpenAI loop run") # Create queue for response streaming - queue = asyncio.Queue() + self.queue = asyncio.Queue() # Ensure tool manager is initialized await self.tool_manager.initialize() # Start loop in background task - self.loop_task = asyncio.create_task(self._run_loop(queue, messages)) + self.loop_task = asyncio.create_task(self._run_loop(self.queue, messages)) # Process and yield messages as they arrive while True: try: - item = await queue.get() + item = await self.queue.get() if item is None: # Stop signal break yield item - queue.task_done() + self.queue.task_done() except Exception as e: logger.error(f"Error processing queue item: {str(e)}") continue diff --git a/libs/agent/agent/providers/uitars/loop.py b/libs/agent/agent/providers/uitars/loop.py index 3766cd92..133a3b83 100644 --- a/libs/agent/agent/providers/uitars/loop.py +++ b/libs/agent/agent/providers/uitars/loop.py @@ -463,17 +463,40 @@ class UITARSLoop(BaseLoop): Yields: Agent response format """ - # Initialize the message manager with the provided messages - self.message_manager.messages = messages.copy() - logger.info(f"Starting UITARSLoop run with {len(self.message_manager.messages)} messages") - - # Create a task to run the loop - self.loop_task = asyncio.create_task(self._run_loop(messages)) - - # Yield from the loop task try: - async for response in self.loop_task: - yield response + logger.info(f"Starting UITARSLoop run with {len(messages)} messages") + + # Initialize the message manager with the provided messages + self.message_manager.messages = messages.copy() + + # Create queue for response streaming + queue = asyncio.Queue() + + # Start loop in background task + self.loop_task = asyncio.create_task(self._run_loop(queue, messages)) + + # Process and yield messages as they arrive + while True: + try: + item = await queue.get() + if item is None: # Stop signal + break + yield item + queue.task_done() + except Exception as e: + logger.error(f"Error processing queue item: {str(e)}") + continue + + # Wait for loop to complete + await self.loop_task + + # Send completion message + yield { + "role": "assistant", + "content": "Task completed successfully.", + "metadata": {"title": "✅ Complete"}, + } + except Exception as e: logger.error(f"Error in run method: {str(e)}") yield { @@ -482,14 +505,12 @@ class UITARSLoop(BaseLoop): "metadata": {"title": "❌ Error"}, } - async def _run_loop(self, messages: List[Dict[str, Any]]) -> AsyncGenerator[AgentResponse, None]: + async def _run_loop(self, queue: asyncio.Queue, messages: List[Dict[str, Any]]) -> None: """Internal method to run the agent loop with provided messages. Args: + queue: Queue to put responses into messages: List of messages in standard OpenAI format - - Yields: - Agent response format """ # Continue running until explicitly told to stop running = True @@ -500,88 +521,94 @@ class UITARSLoop(BaseLoop): attempt = 0 max_attempts = 3 - while running and attempt < max_attempts: - try: - # Create a new turn directory if it's not already created - if not turn_created: - self._create_turn_dir() - turn_created = True + try: + while running and attempt < max_attempts: + try: + # Create a new turn directory if it's not already created + if not turn_created: + self._create_turn_dir() + turn_created = True - # Ensure client is initialized - if self.client is None: - logger.info("Initializing client...") - await self.initialize_client() + # Ensure client is initialized if self.client is None: - raise RuntimeError("Failed to initialize client") - logger.info("Client initialized successfully") + logger.info("Initializing client...") + await self.initialize_client() + if self.client is None: + raise RuntimeError("Failed to initialize client") + logger.info("Client initialized successfully") - # Get current screen - base64_screenshot = await self._get_current_screen() - - # Add screenshot to message history - self.message_manager.add_user_message( - [ - { - "type": "image_url", - "image_url": {"url": f"data:image/png;base64,{base64_screenshot}"}, - } - ] - ) - logger.info("Added screenshot to message history") + # Get current screen + base64_screenshot = await self._get_current_screen() + + # Add screenshot to message history + self.message_manager.add_user_message( + [ + { + "type": "image_url", + "image_url": {"url": f"data:image/png;base64,{base64_screenshot}"}, + } + ] + ) + logger.info("Added screenshot to message history") - # Get system prompt - system_prompt = self._get_system_prompt() + # Get system prompt + system_prompt = self._get_system_prompt() - # Make API call with retries - response = await self._make_api_call( - self.message_manager.messages, system_prompt - ) + # Make API call with retries + response = await self._make_api_call( + self.message_manager.messages, system_prompt + ) - # Handle the response (may execute actions) - # Returns: (should_continue, action_screenshot_saved) - should_continue, new_screenshot_saved = await self._handle_response( - response, self.message_manager.messages - ) + # Handle the response (may execute actions) + # Returns: (should_continue, action_screenshot_saved) + should_continue, new_screenshot_saved = await self._handle_response( + response, self.message_manager.messages + ) - # Update whether an action screenshot was saved this turn - action_screenshot_saved = action_screenshot_saved or new_screenshot_saved - - agent_response = await to_agent_response_format( - response, - messages, - model=self.model, - ) - # Log standardized response for ease of parsing - self._log_api_call("agent_response", request=None, response=agent_response) - yield agent_response - - # Check if we should continue this conversation - running = should_continue + # Update whether an action screenshot was saved this turn + action_screenshot_saved = action_screenshot_saved or new_screenshot_saved + + agent_response = await to_agent_response_format( + response, + messages, + model=self.model, + ) + # Log standardized response for ease of parsing + self._log_api_call("agent_response", request=None, response=agent_response) + + # Put the response in the queue + await queue.put(agent_response) + + # Check if we should continue this conversation + running = should_continue - # Create a new turn directory if we're continuing - if running: - turn_created = False + # Create a new turn directory if we're continuing + if running: + turn_created = False - # Reset attempt counter on success - attempt = 0 + # Reset attempt counter on success + attempt = 0 - except Exception as e: - attempt += 1 - error_msg = f"Error in run method (attempt {attempt}/{max_attempts}): {str(e)}" - logger.error(error_msg) + except Exception as e: + attempt += 1 + error_msg = f"Error in run method (attempt {attempt}/{max_attempts}): {str(e)}" + logger.error(error_msg) - # If this is our last attempt, provide more info about the error - if attempt >= max_attempts: - logger.error(f"Maximum retry attempts reached. Last error was: {str(e)}") + # If this is our last attempt, provide more info about the error + if attempt >= max_attempts: + logger.error(f"Maximum retry attempts reached. Last error was: {str(e)}") - yield { - "role": "assistant", - "content": f"Error: {str(e)}", - "metadata": {"title": "❌ Error"}, - } + await queue.put({ + "role": "assistant", + "content": f"Error: {str(e)}", + "metadata": {"title": "❌ Error"}, + }) - # Create a brief delay before retrying - await asyncio.sleep(1) + # Create a brief delay before retrying + await asyncio.sleep(1) + finally: + # Signal that we're done + await queue.put(None) async def cancel(self) -> None: """Cancel the currently running agent loop task.