agent example

This commit is contained in:
Dillon DuPont
2025-05-13 16:03:43 -04:00
parent c8b3ce1e26
commit 5ccaa5f64a
4 changed files with 272 additions and 101 deletions

View File

@@ -0,0 +1,3 @@
from .diorama import Diorama
__all__ = ["Diorama"]

View File

@@ -0,0 +1,41 @@
import asyncio
from diorama import Diorama
from pathlib import Path
from dotenv import load_dotenv
load_dotenv(Path("~/cua/.env.local").expanduser())
from agent import AgentLoop, ComputerAgent as Agent, LLM, LLMProvider
async def main():
# diorama's are virtual desktops, they allow you to control multiple apps at once
diorama1 = Diorama.create_from_apps("Terminal")
diorama2 = Diorama.create_from_apps("Notes")
diorama3 = Diorama.create_from_apps("Safari")
diorama4 = Diorama.create_from_apps("Calendar")
agents = [
Agent(
computer=diorama1,
model=LLM("openai", "computer-use-preview"),
loop=AgentLoop.OPENAI
),
Agent(diorama2, LLM("anthropic", "claude-3-7-sonnet-20250219"), AgentLoop.ANTHROPIC),
Agent(diorama3, LLM("openai", "gpt-4.1-nano"), AgentLoop.OMNI),
Agent(diorama4, LLM("oaicompat", "tgi", os.getenv("UITARS_BASE_URL")), AgentLoop.UITARS)
]
tasks = [
"In Terminal, run 'echo Hello World'",
"In Notes, create a new note with the title 'Test' and the content 'This is a test note.'",
"In Safari, go to https://www.google.com",
"In Calendar, create a new event with the title 'Test' and the content 'This is a test event.'"
]
async for response in asyncio.gather(*[agent.run(task) for agent, task in zip(agents, tasks)]):
print(response)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,11 +1,19 @@
#!/usr/bin/env python3
"""Diorama: A virtual desktop manager for macOS"""
import os
import asyncio
import logging
import sys
import io
from typing import Union
from PIL import Image
from draw import capture_all_apps
from draw import capture_all_apps, AppActivationContext, get_frontmost_and_active_app, get_all_windows, get_running_apps
from diorama_computer import DioramaComputer
from computer_server.handlers.macos import *
from agent import ComputerAgent, LLM, LLMProvider, AgentLoop
# simple, nicely formatted logging
logging.basicConfig(
@@ -16,15 +24,60 @@ logging.basicConfig(
)
logger = logging.getLogger("diorama.virtual_desktop")
automation_handler = MacOSAutomationHandler()
class AgentFactory:
def __init__(self, diorama):
self.diorama = diorama
def create_agent(self, loop: AgentLoop, model: LLM):
return ComputerAgent(
computer=self.diorama.computer,
loop=loop,
model=model
)
def openai(self):
return self.create_agent(AgentLoop.OPENAI, LLM(
provider=LLMProvider.OPENAI,
name="computer-use-preview"
))
def anthropic(self):
return self.create_agent(AgentLoop.ANTHROPIC, LLM(
provider=LLMProvider.ANTHROPIC,
))
def openai_omni(self, model_name):
return self.create_agent(AgentLoop.OMNI, LLM(
provider=LLMProvider.OPENAI,
name=model_name
))
def uitars(self):
return self.create_agent(AgentLoop.UITARS, LLM(
provider=LLMProvider.OAICOMPAT,
name="tgi",
provider_base_url=os.getenv("UITARS_BASE_URL")
))
class Diorama:
_scheduler_queue = None
_scheduler_task = None
_loop = None
_scheduler_started = False
@classmethod
def create_from_apps(cls, *args) -> DioramaComputer:
cls._ensure_scheduler()
return cls(args).computer
def __init__(self, app_list):
self.app_list = app_list
self.agent = AgentFactory(self)
self.interface = self.Interface(self)
self.computer = DioramaComputer(self)
self.focus_context = None
@classmethod
def _ensure_scheduler(cls):
@@ -43,68 +96,157 @@ class Diorama:
args = cmd.get("arguments", {})
future = cmd.get("future")
logger.info(f"Processing command: {action} | args={args}")
if action == "screenshot":
app_whitelist = args.get("app_list", [])
all_windows = get_all_windows()
running_apps = get_running_apps()
frontmost_app, active_app_to_use, active_app_pid = get_frontmost_and_active_app(all_windows, running_apps, app_whitelist)
focus_context = AppActivationContext(active_app_pid, active_app_to_use, logger)
with focus_context:
try:
app_whitelist = list(args["app_list"]) + ["Window Server", "Dock"]
logger.info(f"Taking screenshot for apps: {app_whitelist}")
result, img = capture_all_apps(
save_to_disk=args.get("save_to_disk", False),
app_whitelist=app_whitelist,
output_dir=args.get("output_dir"),
take_focus=args.get("take_focus", True)
)
logger.info("Screenshot complete.")
if future:
future.set_result((result, img))
if action == "screenshot":
app_whitelist = list(args["app_list"]) + ["Window Server", "Dock"]
logger.info(f"Taking screenshot for apps: {app_whitelist}")
result, img = capture_all_apps(
app_whitelist=app_whitelist,
save_to_disk=False,
take_focus=False
)
logger.info("Screenshot complete.")
if future:
future.set_result((result, img))
# Mouse actions
elif action in ["left_click", "right_click", "double_click", "move_cursor", "drag_to"]:
x = args.get("x")
y = args.get("y")
duration = args.get("duration", 0.5)
if action == "left_click":
await automation_handler.left_click(x, y)
elif action == "right_click":
await automation_handler.right_click(x, y)
elif action == "double_click":
await automation_handler.double_click(x, y)
elif action == "move_cursor":
await automation_handler.move_cursor(x, y)
elif action == "drag_to":
await automation_handler.drag_to(x, y, duration=duration)
if future:
future.set_result(None)
# Keyboard actions
elif action == "type_text":
text = args.get("text")
await automation_handler.type_text(text)
if future:
future.set_result(None)
elif action == "press_key":
key = args.get("key")
await automation_handler.press_key(key)
if future:
future.set_result(None)
elif action == "hotkey":
keys = args.get("keys", [])
await automation_handler.hotkey(keys)
if future:
future.set_result(None)
elif action == "get_cursor_position":
pos = await automation_handler.get_cursor_position()
if future:
future.set_result(pos)
else:
logger.warning(f"Unknown action: {action}")
if future:
future.set_exception(ValueError(f"Unknown action: {action}"))
except Exception as e:
logger.error(f"Exception during screenshot: {e}", exc_info=True)
logger.error(f"Exception during {action}: {e}", exc_info=True)
if future:
future.set_exception(e)
else:
logger.warning(f"Unknown action: {action}")
if future:
future.set_exception(ValueError(f"Unknown action: {action}"))
@classmethod
def create_from_apps(cls, app_list):
cls._ensure_scheduler()
return cls(app_list)
class Interface:
class Interface():
def __init__(self, diorama):
self._diorama = diorama
self.hitboxes = []
self._scene_hitboxes = []
self._scene_size = None
async def screenshot(self, save_to_disk=False, output_dir=None, take_focus=True):
async def _send_cmd(self, action, arguments=None):
Diorama._ensure_scheduler()
loop = asyncio.get_event_loop()
future = loop.create_future()
logger.info(f"Enqueuing screenshot command for apps: {self._diorama.app_list}")
logger.info(f"Enqueuing {action} command for apps: {self._diorama.app_list}")
await Diorama._scheduler_queue.put({
"action": "screenshot",
"arguments": {
"app_list": self._diorama.app_list,
"save_to_disk": save_to_disk,
"output_dir": output_dir,
"take_focus": take_focus
},
"action": action,
"arguments": {"app_list": self._diorama.app_list, **(arguments or {})},
"future": future
})
result, img = await future
# Store hitboxes after screenshot
self.hitboxes = result.get("hitboxes", [])
return result, img
return await future
async def screenshot(self, as_bytes: bool = True) -> Union[bytes, Image]:
result, img = await self._send_cmd("screenshot")
self._scene_hitboxes = result.get("hitboxes", [])
self._scene_size = img.size
if as_bytes:
# PIL Image to bytes
img_byte_arr = io.BytesIO()
img.save(img_byte_arr, format="PNG")
img_byte_arr = img_byte_arr.getvalue()
return img_byte_arr
else:
return img
async def left_click(self, x, y):
sx, sy = await self.to_screen_coordinates(x, y)
await self._send_cmd("left_click", {"x": sx, "y": sy})
async def right_click(self, x, y):
sx, sy = await self.to_screen_coordinates(x, y)
await self._send_cmd("right_click", {"x": sx, "y": sy})
async def double_click(self, x, y):
sx, sy = await self.to_screen_coordinates(x, y)
await self._send_cmd("double_click", {"x": sx, "y": sy})
async def move_cursor(self, x, y):
sx, sy = await self.to_screen_coordinates(x, y)
await self._send_cmd("move_cursor", {"x": sx, "y": sy})
async def drag_to(self, x, y, duration=0.5):
sx, sy = await self.to_screen_coordinates(x, y)
await self._send_cmd("drag_to", {"x": sx, "y": sy, "duration": duration})
async def get_cursor_position(self):
return await self._send_cmd("get_cursor_position")
async def type_text(self, text):
await self._send_cmd("type_text", {"text": text})
async def press_key(self, key):
await self._send_cmd("press_key", {"key": key})
async def hotkey(self, *keys):
await self._send_cmd("hotkey", {"keys": list(keys)})
async def get_screen_size(self) -> dict[str, int]:
if not self._scene_size:
await self.screenshot()
return { "width": self._scene_size[0], "height": self._scene_size[1] }
async def to_screen_coordinates(self, x: float, y: float) -> tuple[float, float]:
"""Convert screenshot coordinates to screen coordinates.
Args:
x: X absolute coordinate in screenshot space
y: Y absolute coordinate in screenshot space
Returns:
tuple[float, float]: (x, y) absolute coordinates in screen space
"""
Convert screenshot-relative coordinates (x, y) to absolute screen coordinates.
Find the first hitbox whose 'hitbox' contains the mapped (abs_x, abs_y).
If none found, return input.
"""
if not self.hitboxes:
if not self._scene_hitboxes:
await self.screenshot() # get hitboxes
# Try all hitboxes
for h in self.hitboxes:
for h in self._scene_hitboxes:
rect = h.get("hitbox")
if not rect or len(rect) != 4:
continue
@@ -119,14 +261,18 @@ class Diorama:
return x, y
async def to_screenshot_coordinates(self, x: float, y: float) -> tuple[float, float]:
"""Convert screen coordinates to screenshot coordinates.
Args:
x: X absolute coordinate in screen space
y: Y absolute coordinate in screen space
Returns:
tuple[float, float]: (x, y) absolute coordinates in screenshot space
"""
Convert absolute screen coordinates (x, y) to screenshot-relative coordinates (normalized to [0, 1]).
Find the first hitbox whose 'target' contains (x, y).
If none found, return input.
"""
if not self.hitboxes:
if not self._scene_hitboxes:
await self.screenshot() # get hitboxes
for h in self.hitboxes:
for h in self._scene_hitboxes:
rect = h.get("target")
if not rect or len(rect) != 4:
continue
@@ -144,60 +290,14 @@ async def main():
from draw import capture_all_apps
desktop1 = Diorama.create_from_apps(["Discord", "Notes"])
desktop2 = Diorama.create_from_apps(["Terminal"])
# Take full screen screenshot (no app whitelist)
result_full, img_full = capture_all_apps()
# Take desktop1 screenshot
result1, img1 = await desktop1.interface.screenshot()
img1 = await desktop1.interface.screenshot(as_bytes=False)
img2 = await desktop2.interface.screenshot(as_bytes=False)
# Pick a sample normalized screenshot coordinate
test_screenshot_coord = (0.5, 0.5) # center
# Convert to screen coordinates using desktop1 (should map to full screenshot)
screen_coord = await desktop1.interface.to_screen_coordinates(*test_screenshot_coord)
# Convert back to screenshot coordinates on desktop1
screenshot_coord_back = await desktop1.interface.to_screenshot_coordinates(*screen_coord)
img1.save("app_screenshots/desktop1.png")
img2.save("app_screenshots/desktop2.png")
# Draw on full screenshot: the mapped screen coordinate
img_full = img_full.convert("RGBA")
img1 = img1.convert("RGBA")
width_full, height_full = img_full.size
width1, height1 = img1.size
x_screen, y_screen = int(screen_coord[0]), int(screen_coord[1])
x1, y1 = int(screenshot_coord_back[0] * width1), int(screenshot_coord_back[1] * height1)
draw_full = ImageDraw.Draw(img_full)
r = 12
draw_full.ellipse([(x_screen - r, y_screen - r), (x_screen + r, y_screen + r)], fill=(255,0,0,200), outline=(0,0,0,255))
draw_full.text((x_screen + r, y_screen), "screen coord", fill=(255,0,0,255))
draw1 = ImageDraw.Draw(img1)
draw1.ellipse([(x1 - r, y1 - r), (x1 + r, y1 + r)], fill=(0,0,255,200), outline=(0,0,0,255))
draw1.text((x1 + r, y1), f"screenshot coord", fill=(0,0,255,255))
# Create a new image side by side
total_width = img_full.width + img1.width
max_height = max(img_full.height, img1.height)
combined = Image.new("RGBA", (total_width, max_height), (255,255,255,255))
combined.paste(img_full, (0, 0))
combined.paste(img1, (img_full.width, 0))
# Draw an arrow from the point in img_full to the point in img1
arrow_draw = ImageDraw.Draw(combined)
start = (x_screen, y_screen)
end = (x1 + img_full.width, y1)
arrow_draw.line([start, end], fill=(0,128,0,255), width=3)
# Arrowhead
def draw_arrowhead(draw, start, end, color, size=15):
import math
angle = math.atan2(end[1] - start[1], end[0] - start[0])
for a in [math.pi/8, -math.pi/8]:
x = end[0] - size * math.cos(angle + a)
y = end[1] - size * math.sin(angle + a)
draw.line([end, (x, y)], fill=color, width=3)
draw_arrowhead(arrow_draw, start, end, (0,128,0,255))
combined.save("coord_mapping_demo.png")
print("Saved coordinate mapping demo to coord_mapping_demo.png")
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,27 @@
import asyncio
class DioramaComputer:
"""
A minimal Computer-like interface for Diorama, compatible with ComputerAgent.
Implements _initialized, run(), and __aenter__ for agent compatibility.
"""
def __init__(self, diorama):
self.diorama = diorama
self.interface = self.diorama.interface
self.agent = self.diorama.agent
self._initialized = False
async def __aenter__(self):
# Ensure the event loop is running (for compatibility)
try:
asyncio.get_running_loop()
except RuntimeError:
asyncio.set_event_loop(asyncio.new_event_loop())
self._initialized = True
return self
async def run(self):
# This is a stub for compatibility
if not self._initialized:
await self.__aenter__()
return self