Fixes issue #172

This commit is contained in:
Dillon DuPont
2025-05-12 08:55:13 -04:00
parent aa01322220
commit 1b1eb81374

View File

@@ -581,17 +581,40 @@ class OmniLoop(BaseLoop):
Yields:
Agent response format
"""
# Initialize the message manager with the provided messages
self.message_manager.messages = messages.copy()
logger.info(f"Starting OmniLoop run with {len(self.message_manager.messages)} messages")
# Create a task to run the loop
self.loop_task = asyncio.create_task(self._run_loop(messages))
# Yield from the loop task
try:
async for response in self.loop_task:
yield response
logger.info(f"Starting OmniLoop run with {len(messages)} messages")
# Initialize the message manager with the provided messages
self.message_manager.messages = messages.copy()
# Create queue for response streaming
queue = asyncio.Queue()
# Start loop in background task
self.loop_task = asyncio.create_task(self._run_loop(queue, messages))
# Process and yield messages as they arrive
while True:
try:
item = await queue.get()
if item is None: # Stop signal
break
yield item
queue.task_done()
except Exception as e:
logger.error(f"Error processing queue item: {str(e)}")
continue
# Wait for loop to complete
await self.loop_task
# Send completion message
yield {
"role": "assistant",
"content": "Task completed successfully.",
"metadata": {"title": "✅ Complete"},
}
except Exception as e:
logger.error(f"Error in run method: {str(e)}")
yield {
@@ -600,14 +623,12 @@ class OmniLoop(BaseLoop):
"metadata": {"title": "❌ Error"},
}
async def _run_loop(self, messages: List[Dict[str, Any]]) -> AsyncGenerator[AgentResponse, None]:
async def _run_loop(self, queue: asyncio.Queue, messages: List[Dict[str, Any]]) -> None:
"""Internal method to run the agent loop with provided messages.
Args:
queue: Queue to put responses into
messages: List of messages in standard OpenAI format
Yields:
Agent response format
"""
# Continue running until explicitly told to stop
running = True
@@ -698,8 +719,8 @@ class OmniLoop(BaseLoop):
# Log standardized response for ease of parsing
self._log_api_call("agent_response", request=None, response=openai_compatible_response)
# Yield the response to the caller
yield openai_compatible_response
# Put the response in the queue
await queue.put(openai_compatible_response)
# Check if we should continue this conversation
running = should_continue
@@ -720,14 +741,17 @@ class OmniLoop(BaseLoop):
if attempt >= max_attempts:
logger.error(f"Maximum retry attempts reached. Last error was: {str(e)}")
yield {
await queue.put({
"role": "assistant",
"content": f"Error: {str(e)}",
"metadata": {"title": "❌ Error"},
}
})
# Create a brief delay before retrying
await asyncio.sleep(1)
finally:
# Signal that we're done
await queue.put(None)
async def cancel(self) -> None:
"""Cancel the currently running agent loop task.