fixed hitboxes and agent example

This commit is contained in:
Dillon DuPont
2025-05-14 09:34:16 -04:00
parent 506347723a
commit 2c047215e1
3 changed files with 177 additions and 58 deletions

View File

@@ -1,41 +1,68 @@
import asyncio
from diorama import Diorama
from pathlib import Path
from dotenv import load_dotenv
load_dotenv(Path("~/cua/.env.local").expanduser())
from agent import AgentLoop, ComputerAgent as Agent, LLM, LLMProvider
from PIL import Image
import rpack
async def make_mosaic(dioramas):
sizes = []
for d in dioramas:
size = await d.interface.get_screen_size()
sizes.append((size['width'], size['height']))
positions = rpack.pack(sizes)
max_x = max(x + w for (x, y), (w, h) in zip(positions, sizes))
max_y = max(y + h for (x, y), (w, h) in zip(positions, sizes))
mosaic = Image.new("RGBA", (max_x, max_y), (30, 30, 30, 255))
draw_positions = positions
return mosaic, draw_positions
async def main():
# diorama's are virtual desktops, they allow you to control multiple apps at once
diorama1 = Diorama.create_from_apps("Terminal")
diorama1 = Diorama.create_from_apps("Safari")
diorama2 = Diorama.create_from_apps("Notes")
diorama3 = Diorama.create_from_apps("Safari")
diorama4 = Diorama.create_from_apps("Calendar")
diorama3 = Diorama.create_from_apps("Calculator")
diorama4 = Diorama.create_from_apps("Terminal")
# create agents
agents = [
Agent(
computer=diorama1,
model=LLM("openai", "computer-use-preview"),
loop=AgentLoop.OPENAI
),
Agent(diorama2, LLM("anthropic", "claude-3-7-sonnet-20250219"), AgentLoop.ANTHROPIC),
Agent(diorama3, LLM("openai", "gpt-4.1-nano"), AgentLoop.OMNI),
Agent(diorama4, LLM("oaicompat", "tgi", os.getenv("UITARS_BASE_URL")), AgentLoop.UITARS)
diorama1.agent.openai(),
diorama2.agent.openai(),
diorama3.agent.openai(),modif
diorama4.agent.openai()
]
tasks = [
"In Terminal, run 'echo Hello World'",
"In Notes, create a new note with the title 'Test' and the content 'This is a test note.'",
"In Safari, go to https://www.google.com",
"In Calendar, create a new event with the title 'Test' and the content 'This is a test event.'"
]
async for response in asyncio.gather(*[agent.run(task) for agent, task in zip(agents, tasks)]):
print(response)
dioramas = [diorama1, diorama2, diorama3, diorama4]
mosaic, draw_positions = await make_mosaic(dioramas)
mosaic.save(Path("~/cua/notebooks/app_screenshots/mosaic.png").expanduser())
tasks = [
"In Safari, find a cat picture",
"In Notes, make a note named 'Test' and draw an ASCII dog",
"In Calculator, add 2 + 2",
"In Terminal, type 'ls' and press enter"
]
async def run_agent(agent, task, diorama_idx):
diorama = dioramas[diorama_idx]
# start with a screenshot
screenshot = await diorama.interface.screenshot(as_bytes=False)
mosaic.paste(screenshot, draw_positions[diorama_idx])
mosaic.save(Path("~/cua/notebooks/app_screenshots/mosaic.png").expanduser())
async for response in agent.run(task):
print(response)
# update mosaic
screenshot = await diorama.interface.screenshot(as_bytes=False)
mosaic.paste(screenshot, draw_positions[diorama_idx])
mosaic.save(Path("~/cua/notebooks/app_screenshots/mosaic.png").expanduser())
# run agents
await asyncio.gather(*[run_agent(agent, task, idx) for idx, (agent, task) in enumerate(zip(agents, tasks))])
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -7,7 +7,7 @@ import logging
import sys
import io
from typing import Union
from PIL import Image
from PIL import Image, ImageDraw
from draw import capture_all_apps, AppActivationContext, get_frontmost_and_active_app, get_all_windows, get_running_apps
@@ -107,7 +107,7 @@ class Diorama:
with focus_context:
try:
if action == "screenshot":
app_whitelist = list(args["app_list"]) + ["Window Server", "Dock"]
app_whitelist = list(args["app_list"])
logger.info(f"Taking screenshot for apps: {app_whitelist}")
result, img = capture_all_apps(
app_whitelist=app_whitelist,
@@ -180,7 +180,11 @@ class Diorama:
"arguments": {"app_list": self._diorama.app_list, **(arguments or {})},
"future": future
})
return await future
try:
return await future
except asyncio.CancelledError:
logger.warning(f"Command was cancelled: {action}")
return None
async def screenshot(self, as_bytes: bool = True) -> Union[bytes, Image]:
result, img = await self._send_cmd("screenshot")
@@ -246,18 +250,28 @@ class Diorama:
if not self._scene_hitboxes:
await self.screenshot() # get hitboxes
# Try all hitboxes
for h in self._scene_hitboxes:
rect = h.get("hitbox")
if not rect or len(rect) != 4:
for h in self._scene_hitboxes[::-1]:
rect_from = h.get("hitbox")
rect_to = h.get("target")
if not rect_from or len(rect_from) != 4:
continue
x0, y0, x1, y1 = rect
width = x1 - x0
height = y1 - y0
abs_x = x0 + x * width
abs_y = y0 + y * height
# Check if (abs_x, abs_y) is inside this hitbox
if x0 <= abs_x <= x1 and y0 <= abs_y <= y1:
return abs_x, abs_y
# check if (x, y) is inside rect_from
x0, y0, x1, y1 = rect_from
if x0 <= x <= x1 and y0 <= y <= y1:
logger.info(f"Found hitbox: {h}")
# remap (x, y) to rect_to
tx0, ty0, tx1, ty1 = rect_to
# calculate offset from x0, y0
offset_x = x - x0
offset_y = y - y0
# remap offset to rect_to
tx = tx0 + offset_x
ty = ty0 + offset_y
return tx, ty
return x, y
async def to_screenshot_coordinates(self, x: float, y: float) -> tuple[float, float]:
@@ -272,23 +286,34 @@ class Diorama:
"""
if not self._scene_hitboxes:
await self.screenshot() # get hitboxes
for h in self._scene_hitboxes:
rect = h.get("target")
if not rect or len(rect) != 4:
# Try all hitboxes
for h in self._scene_hitboxes[::-1]:
rect_from = h.get("target")
rect_to = h.get("hitbox")
if not rect_from or len(rect_from) != 4:
continue
x0, y0, x1, y1 = rect
width = x1 - x0
height = y1 - y0
# check if (x, y) is inside rect_from
x0, y0, x1, y1 = rect_from
if x0 <= x <= x1 and y0 <= y <= y1:
rel_x = (x - x0) / width if width else 0.0
rel_y = (y - y0) / height if height else 0.0
return rel_x, rel_y
# remap (x, y) to rect_to
tx0, ty0, tx1, ty1 = rect_to
# calculate offset from x0, y0
offset_x = x - x0
offset_y = y - y0
# remap offset to rect_to
tx = tx0 + offset_x
ty = ty0 + offset_y
return tx, ty
return x, y
async def main():
from PIL import Image, ImageDraw
from draw import capture_all_apps
import pyautogui
import time
async def main():
desktop1 = Diorama.create_from_apps(["Discord", "Notes"])
desktop2 = Diorama.create_from_apps(["Terminal"])
@@ -297,7 +322,71 @@ async def main():
img1.save("app_screenshots/desktop1.png")
img2.save("app_screenshots/desktop2.png")
# Initialize Diorama desktop
desktop3 = Diorama.create_from_apps("Safari")
screen_size = await desktop3.interface.get_screen_size()
print(screen_size)
# Take initial screenshot
img = await desktop3.interface.screenshot(as_bytes=False)
img.save("app_screenshots/desktop3.png")
# Prepare hitboxes and draw on the single screenshot
hitboxes = desktop3.interface._scene_hitboxes[::-1]
base_img = img.copy()
draw = ImageDraw.Draw(base_img)
for h in hitboxes:
rect = h.get("hitbox")
if not rect or len(rect) != 4:
continue
draw.rectangle(rect, outline="red", width=2)
# Track and draw mouse position in real time (single screenshot size)
last_mouse_pos = None
print("Tracking mouse... Press Ctrl+C to stop.")
try:
while True:
mouse_x, mouse_y = pyautogui.position()
if last_mouse_pos != (mouse_x, mouse_y):
last_mouse_pos = (mouse_x, mouse_y)
# Map to screenshot coordinates
sx, sy = await desktop3.interface.to_screenshot_coordinates(mouse_x, mouse_y)
# Draw on a copy of the screenshot
frame = base_img.copy()
frame_draw = ImageDraw.Draw(frame)
frame_draw.ellipse((sx-5, sy-5, sx+5, sy+5), fill="blue", outline="blue")
# Save the frame
frame.save("app_screenshots/desktop3_mouse.png")
print(f"Mouse at screen ({mouse_x}, {mouse_y}) -> screenshot ({sx:.1f}, {sy:.1f})")
time.sleep(0.05) # Throttle updates to ~20 FPS
except KeyboardInterrupt:
print("Stopped tracking.")
draw.text((rect[0], rect[1]), str(idx), fill="red")
canvas.save("app_screenshots/desktop3_hitboxes.png")
# move mouse in a square spiral around the screen
import math
import random
step = 20 # pixels per move
dot_radius = 10
width = screen_size["width"]
height = screen_size["height"]
x, y = 0, 10
while x < width and y < height:
await desktop3.interface.move_cursor(x, y)
img = await desktop3.interface.screenshot(as_bytes=False)
draw = ImageDraw.Draw(img)
draw.ellipse((x-dot_radius, y-dot_radius, x+dot_radius, y+dot_radius), fill="red")
img.save("current.png")
await asyncio.sleep(0.03)
x += step
y = math.sin(x / width * math.pi * 2) * 50 + 25
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -465,7 +465,7 @@ def draw_desktop_screenshot(app_whitelist: List[str] = None, all_windows: List[D
_draw_layer(cg_context, first_pass_windows, app_source_rect, app_target_rect)
hitboxes.append({
"hitbox": [0, 0, app_bounds["width"], app_bounds["height"]],
"hitbox": [0, menubar_bounds["height"], app_bounds["width"], menubar_bounds["height"] + app_bounds["height"]],
"target": [
app_source_rect.origin.x,
app_source_rect.origin.y,
@@ -505,6 +505,8 @@ def draw_desktop_screenshot(app_whitelist: List[str] = None, all_windows: List[D
elif item["subrole"] == "AXMinimizedWindowDockItem":
if not any(window["name"] == item["title"] and window["role"] == "app" and window["owner"] in app_whitelist for window in all_windows):
continue
elif item["subrole"] == "AXFolderDockItem":
continue
# Preserve unscaled (original) source position and size before any modification
hitbox_position = source_position
@@ -1031,6 +1033,12 @@ def capture_all_apps(save_to_disk: bool = False, app_whitelist: List[str] = None
# DEBUG: Save hitboxes to disk
if desktop_screenshot and save_to_disk and output_dir:
desktop_path = os.path.join(output_dir, "desktop.png")
desktop_screenshot.save(desktop_path)
result["desktop_screenshot"] = desktop_path
logger.info(f"Saved desktop screenshot to {desktop_path}")
if app_whitelist:
# Take screenshot without whitelist
desktop_screenshot_full, hitboxes_full = draw_desktop_screenshot(
@@ -1053,10 +1061,6 @@ def capture_all_apps(save_to_disk: bool = False, app_whitelist: List[str] = None
combined.save(side_by_side_path)
result["side_by_side_hitboxes"] = side_by_side_path
else:
desktop_path = os.path.join(output_dir, "desktop.png")
desktop_screenshot.save(desktop_path)
result["desktop_screenshot"] = desktop_path
# Overlay hitboxes using new function
hitbox_img = _draw_hitboxes(desktop_screenshot.copy(), hitboxes, key="hitbox")
hitbox_path = os.path.join(output_dir, "hitboxes.png")
@@ -1099,9 +1103,8 @@ async def run_capture():
groups.append(group)
screenshots = []
for group in groups:
app_whitelist = group + ["Window Server", "Dock"]
print(f"Capturing for apps: {app_whitelist}")
_, img = capture_all_apps(app_whitelist=app_whitelist)
print(f"Capturing for apps: {group}")
_, img = capture_all_apps(app_whitelist=group)
if img:
screenshots.append((group, img))
if not screenshots: