Fixes issue #172

This commit is contained in:
Dillon DuPont
2025-05-12 08:54:28 -04:00
parent ee7784e2dd
commit aa01322220
2 changed files with 114 additions and 87 deletions

View File

@@ -133,22 +133,22 @@ class OpenAILoop(BaseLoop):
logger.info("Starting OpenAI loop run")
# Create queue for response streaming
queue = asyncio.Queue()
self.queue = asyncio.Queue()
# Ensure tool manager is initialized
await self.tool_manager.initialize()
# Start loop in background task
self.loop_task = asyncio.create_task(self._run_loop(queue, messages))
self.loop_task = asyncio.create_task(self._run_loop(self.queue, messages))
# Process and yield messages as they arrive
while True:
try:
item = await queue.get()
item = await self.queue.get()
if item is None: # Stop signal
break
yield item
queue.task_done()
self.queue.task_done()
except Exception as e:
logger.error(f"Error processing queue item: {str(e)}")
continue

View File

@@ -463,17 +463,40 @@ class UITARSLoop(BaseLoop):
Yields:
Agent response format
"""
# Initialize the message manager with the provided messages
self.message_manager.messages = messages.copy()
logger.info(f"Starting UITARSLoop run with {len(self.message_manager.messages)} messages")
# Create a task to run the loop
self.loop_task = asyncio.create_task(self._run_loop(messages))
# Yield from the loop task
try:
async for response in self.loop_task:
yield response
logger.info(f"Starting UITARSLoop run with {len(messages)} messages")
# Initialize the message manager with the provided messages
self.message_manager.messages = messages.copy()
# Create queue for response streaming
queue = asyncio.Queue()
# Start loop in background task
self.loop_task = asyncio.create_task(self._run_loop(queue, messages))
# Process and yield messages as they arrive
while True:
try:
item = await queue.get()
if item is None: # Stop signal
break
yield item
queue.task_done()
except Exception as e:
logger.error(f"Error processing queue item: {str(e)}")
continue
# Wait for loop to complete
await self.loop_task
# Send completion message
yield {
"role": "assistant",
"content": "Task completed successfully.",
"metadata": {"title": "✅ Complete"},
}
except Exception as e:
logger.error(f"Error in run method: {str(e)}")
yield {
@@ -482,14 +505,12 @@ class UITARSLoop(BaseLoop):
"metadata": {"title": "❌ Error"},
}
async def _run_loop(self, messages: List[Dict[str, Any]]) -> AsyncGenerator[AgentResponse, None]:
async def _run_loop(self, queue: asyncio.Queue, messages: List[Dict[str, Any]]) -> None:
"""Internal method to run the agent loop with provided messages.
Args:
queue: Queue to put responses into
messages: List of messages in standard OpenAI format
Yields:
Agent response format
"""
# Continue running until explicitly told to stop
running = True
@@ -500,88 +521,94 @@ class UITARSLoop(BaseLoop):
attempt = 0
max_attempts = 3
while running and attempt < max_attempts:
try:
# Create a new turn directory if it's not already created
if not turn_created:
self._create_turn_dir()
turn_created = True
try:
while running and attempt < max_attempts:
try:
# Create a new turn directory if it's not already created
if not turn_created:
self._create_turn_dir()
turn_created = True
# Ensure client is initialized
if self.client is None:
logger.info("Initializing client...")
await self.initialize_client()
# Ensure client is initialized
if self.client is None:
raise RuntimeError("Failed to initialize client")
logger.info("Client initialized successfully")
logger.info("Initializing client...")
await self.initialize_client()
if self.client is None:
raise RuntimeError("Failed to initialize client")
logger.info("Client initialized successfully")
# Get current screen
base64_screenshot = await self._get_current_screen()
# Add screenshot to message history
self.message_manager.add_user_message(
[
{
"type": "image_url",
"image_url": {"url": f"data:image/png;base64,{base64_screenshot}"},
}
]
)
logger.info("Added screenshot to message history")
# Get current screen
base64_screenshot = await self._get_current_screen()
# Add screenshot to message history
self.message_manager.add_user_message(
[
{
"type": "image_url",
"image_url": {"url": f"data:image/png;base64,{base64_screenshot}"},
}
]
)
logger.info("Added screenshot to message history")
# Get system prompt
system_prompt = self._get_system_prompt()
# Get system prompt
system_prompt = self._get_system_prompt()
# Make API call with retries
response = await self._make_api_call(
self.message_manager.messages, system_prompt
)
# Make API call with retries
response = await self._make_api_call(
self.message_manager.messages, system_prompt
)
# Handle the response (may execute actions)
# Returns: (should_continue, action_screenshot_saved)
should_continue, new_screenshot_saved = await self._handle_response(
response, self.message_manager.messages
)
# Handle the response (may execute actions)
# Returns: (should_continue, action_screenshot_saved)
should_continue, new_screenshot_saved = await self._handle_response(
response, self.message_manager.messages
)
# Update whether an action screenshot was saved this turn
action_screenshot_saved = action_screenshot_saved or new_screenshot_saved
agent_response = await to_agent_response_format(
response,
messages,
model=self.model,
)
# Log standardized response for ease of parsing
self._log_api_call("agent_response", request=None, response=agent_response)
yield agent_response
# Check if we should continue this conversation
running = should_continue
# Update whether an action screenshot was saved this turn
action_screenshot_saved = action_screenshot_saved or new_screenshot_saved
agent_response = await to_agent_response_format(
response,
messages,
model=self.model,
)
# Log standardized response for ease of parsing
self._log_api_call("agent_response", request=None, response=agent_response)
# Put the response in the queue
await queue.put(agent_response)
# Check if we should continue this conversation
running = should_continue
# Create a new turn directory if we're continuing
if running:
turn_created = False
# Create a new turn directory if we're continuing
if running:
turn_created = False
# Reset attempt counter on success
attempt = 0
# Reset attempt counter on success
attempt = 0
except Exception as e:
attempt += 1
error_msg = f"Error in run method (attempt {attempt}/{max_attempts}): {str(e)}"
logger.error(error_msg)
except Exception as e:
attempt += 1
error_msg = f"Error in run method (attempt {attempt}/{max_attempts}): {str(e)}"
logger.error(error_msg)
# If this is our last attempt, provide more info about the error
if attempt >= max_attempts:
logger.error(f"Maximum retry attempts reached. Last error was: {str(e)}")
# If this is our last attempt, provide more info about the error
if attempt >= max_attempts:
logger.error(f"Maximum retry attempts reached. Last error was: {str(e)}")
yield {
"role": "assistant",
"content": f"Error: {str(e)}",
"metadata": {"title": "❌ Error"},
}
await queue.put({
"role": "assistant",
"content": f"Error: {str(e)}",
"metadata": {"title": "❌ Error"},
})
# Create a brief delay before retrying
await asyncio.sleep(1)
# Create a brief delay before retrying
await asyncio.sleep(1)
finally:
# Signal that we're done
await queue.put(None)
async def cancel(self) -> None:
"""Cancel the currently running agent loop task.