From 2c047215e18fe9213a50382d7deabf1e7555cc6f Mon Sep 17 00:00:00 2001 From: Dillon DuPont Date: Wed, 14 May 2025 09:34:16 -0400 Subject: [PATCH] fixed hitboxes and agent example --- notebooks/diorama/agent_example.py | 75 ++++++++++----- notebooks/diorama/diorama.py | 141 +++++++++++++++++++++++------ notebooks/diorama/draw.py | 19 ++-- 3 files changed, 177 insertions(+), 58 deletions(-) diff --git a/notebooks/diorama/agent_example.py b/notebooks/diorama/agent_example.py index 04e48865..914306a7 100644 --- a/notebooks/diorama/agent_example.py +++ b/notebooks/diorama/agent_example.py @@ -1,41 +1,68 @@ import asyncio from diorama import Diorama from pathlib import Path - from dotenv import load_dotenv load_dotenv(Path("~/cua/.env.local").expanduser()) from agent import AgentLoop, ComputerAgent as Agent, LLM, LLMProvider +from PIL import Image +import rpack + +async def make_mosaic(dioramas): + sizes = [] + for d in dioramas: + size = await d.interface.get_screen_size() + sizes.append((size['width'], size['height'])) + positions = rpack.pack(sizes) + max_x = max(x + w for (x, y), (w, h) in zip(positions, sizes)) + max_y = max(y + h for (x, y), (w, h) in zip(positions, sizes)) + mosaic = Image.new("RGBA", (max_x, max_y), (30, 30, 30, 255)) + draw_positions = positions + return mosaic, draw_positions async def main(): # diorama's are virtual desktops, they allow you to control multiple apps at once - diorama1 = Diorama.create_from_apps("Terminal") + diorama1 = Diorama.create_from_apps("Safari") diorama2 = Diorama.create_from_apps("Notes") - diorama3 = Diorama.create_from_apps("Safari") - diorama4 = Diorama.create_from_apps("Calendar") - + diorama3 = Diorama.create_from_apps("Calculator") + diorama4 = Diorama.create_from_apps("Terminal") + # create agents agents = [ - Agent( - computer=diorama1, - model=LLM("openai", "computer-use-preview"), - loop=AgentLoop.OPENAI - ), - Agent(diorama2, LLM("anthropic", "claude-3-7-sonnet-20250219"), AgentLoop.ANTHROPIC), - Agent(diorama3, LLM("openai", "gpt-4.1-nano"), AgentLoop.OMNI), - Agent(diorama4, LLM("oaicompat", "tgi", os.getenv("UITARS_BASE_URL")), AgentLoop.UITARS) + diorama1.agent.openai(), + diorama2.agent.openai(), + diorama3.agent.openai(),modif + diorama4.agent.openai() ] - - tasks = [ - "In Terminal, run 'echo Hello World'", - "In Notes, create a new note with the title 'Test' and the content 'This is a test note.'", - "In Safari, go to https://www.google.com", - "In Calendar, create a new event with the title 'Test' and the content 'This is a test event.'" - ] - - async for response in asyncio.gather(*[agent.run(task) for agent, task in zip(agents, tasks)]): - print(response) - + dioramas = [diorama1, diorama2, diorama3, diorama4] + mosaic, draw_positions = await make_mosaic(dioramas) + mosaic.save(Path("~/cua/notebooks/app_screenshots/mosaic.png").expanduser()) + tasks = [ + "In Safari, find a cat picture", + "In Notes, make a note named 'Test' and draw an ASCII dog", + "In Calculator, add 2 + 2", + "In Terminal, type 'ls' and press enter" + ] + + async def run_agent(agent, task, diorama_idx): + diorama = dioramas[diorama_idx] + + # start with a screenshot + screenshot = await diorama.interface.screenshot(as_bytes=False) + mosaic.paste(screenshot, draw_positions[diorama_idx]) + mosaic.save(Path("~/cua/notebooks/app_screenshots/mosaic.png").expanduser()) + + async for response in agent.run(task): + print(response) + + # update mosaic + screenshot = await diorama.interface.screenshot(as_bytes=False) + mosaic.paste(screenshot, draw_positions[diorama_idx]) + mosaic.save(Path("~/cua/notebooks/app_screenshots/mosaic.png").expanduser()) + + # run agents + await asyncio.gather(*[run_agent(agent, task, idx) for idx, (agent, task) in enumerate(zip(agents, tasks))]) + if __name__ == "__main__": asyncio.run(main()) \ No newline at end of file diff --git a/notebooks/diorama/diorama.py b/notebooks/diorama/diorama.py index f10e2a84..f4e5cbdf 100644 --- a/notebooks/diorama/diorama.py +++ b/notebooks/diorama/diorama.py @@ -7,7 +7,7 @@ import logging import sys import io from typing import Union -from PIL import Image +from PIL import Image, ImageDraw from draw import capture_all_apps, AppActivationContext, get_frontmost_and_active_app, get_all_windows, get_running_apps @@ -107,7 +107,7 @@ class Diorama: with focus_context: try: if action == "screenshot": - app_whitelist = list(args["app_list"]) + ["Window Server", "Dock"] + app_whitelist = list(args["app_list"]) logger.info(f"Taking screenshot for apps: {app_whitelist}") result, img = capture_all_apps( app_whitelist=app_whitelist, @@ -180,7 +180,11 @@ class Diorama: "arguments": {"app_list": self._diorama.app_list, **(arguments or {})}, "future": future }) - return await future + try: + return await future + except asyncio.CancelledError: + logger.warning(f"Command was cancelled: {action}") + return None async def screenshot(self, as_bytes: bool = True) -> Union[bytes, Image]: result, img = await self._send_cmd("screenshot") @@ -246,18 +250,28 @@ class Diorama: if not self._scene_hitboxes: await self.screenshot() # get hitboxes # Try all hitboxes - for h in self._scene_hitboxes: - rect = h.get("hitbox") - if not rect or len(rect) != 4: + for h in self._scene_hitboxes[::-1]: + rect_from = h.get("hitbox") + rect_to = h.get("target") + if not rect_from or len(rect_from) != 4: continue - x0, y0, x1, y1 = rect - width = x1 - x0 - height = y1 - y0 - abs_x = x0 + x * width - abs_y = y0 + y * height - # Check if (abs_x, abs_y) is inside this hitbox - if x0 <= abs_x <= x1 and y0 <= abs_y <= y1: - return abs_x, abs_y + + # check if (x, y) is inside rect_from + x0, y0, x1, y1 = rect_from + if x0 <= x <= x1 and y0 <= y <= y1: + logger.info(f"Found hitbox: {h}") + # remap (x, y) to rect_to + tx0, ty0, tx1, ty1 = rect_to + + # calculate offset from x0, y0 + offset_x = x - x0 + offset_y = y - y0 + + # remap offset to rect_to + tx = tx0 + offset_x + ty = ty0 + offset_y + + return tx, ty return x, y async def to_screenshot_coordinates(self, x: float, y: float) -> tuple[float, float]: @@ -272,23 +286,34 @@ class Diorama: """ if not self._scene_hitboxes: await self.screenshot() # get hitboxes - for h in self._scene_hitboxes: - rect = h.get("target") - if not rect or len(rect) != 4: + # Try all hitboxes + for h in self._scene_hitboxes[::-1]: + rect_from = h.get("target") + rect_to = h.get("hitbox") + if not rect_from or len(rect_from) != 4: continue - x0, y0, x1, y1 = rect - width = x1 - x0 - height = y1 - y0 + + # check if (x, y) is inside rect_from + x0, y0, x1, y1 = rect_from if x0 <= x <= x1 and y0 <= y <= y1: - rel_x = (x - x0) / width if width else 0.0 - rel_y = (y - y0) / height if height else 0.0 - return rel_x, rel_y + # remap (x, y) to rect_to + tx0, ty0, tx1, ty1 = rect_to + + # calculate offset from x0, y0 + offset_x = x - x0 + offset_y = y - y0 + + # remap offset to rect_to + tx = tx0 + offset_x + ty = ty0 + offset_y + + return tx, ty return x, y -async def main(): - from PIL import Image, ImageDraw - from draw import capture_all_apps +import pyautogui +import time +async def main(): desktop1 = Diorama.create_from_apps(["Discord", "Notes"]) desktop2 = Diorama.create_from_apps(["Terminal"]) @@ -297,7 +322,71 @@ async def main(): img1.save("app_screenshots/desktop1.png") img2.save("app_screenshots/desktop2.png") + # Initialize Diorama desktop + desktop3 = Diorama.create_from_apps("Safari") + screen_size = await desktop3.interface.get_screen_size() + print(screen_size) + # Take initial screenshot + img = await desktop3.interface.screenshot(as_bytes=False) + img.save("app_screenshots/desktop3.png") + + # Prepare hitboxes and draw on the single screenshot + hitboxes = desktop3.interface._scene_hitboxes[::-1] + base_img = img.copy() + draw = ImageDraw.Draw(base_img) + for h in hitboxes: + rect = h.get("hitbox") + if not rect or len(rect) != 4: + continue + draw.rectangle(rect, outline="red", width=2) + + # Track and draw mouse position in real time (single screenshot size) + last_mouse_pos = None + print("Tracking mouse... Press Ctrl+C to stop.") + try: + while True: + mouse_x, mouse_y = pyautogui.position() + if last_mouse_pos != (mouse_x, mouse_y): + last_mouse_pos = (mouse_x, mouse_y) + # Map to screenshot coordinates + sx, sy = await desktop3.interface.to_screenshot_coordinates(mouse_x, mouse_y) + # Draw on a copy of the screenshot + frame = base_img.copy() + frame_draw = ImageDraw.Draw(frame) + frame_draw.ellipse((sx-5, sy-5, sx+5, sy+5), fill="blue", outline="blue") + # Save the frame + frame.save("app_screenshots/desktop3_mouse.png") + print(f"Mouse at screen ({mouse_x}, {mouse_y}) -> screenshot ({sx:.1f}, {sy:.1f})") + time.sleep(0.05) # Throttle updates to ~20 FPS + except KeyboardInterrupt: + print("Stopped tracking.") + + draw.text((rect[0], rect[1]), str(idx), fill="red") + + canvas.save("app_screenshots/desktop3_hitboxes.png") + + + + # move mouse in a square spiral around the screen + import math + import random + + step = 20 # pixels per move + dot_radius = 10 + width = screen_size["width"] + height = screen_size["height"] + x, y = 0, 10 + + while x < width and y < height: + await desktop3.interface.move_cursor(x, y) + img = await desktop3.interface.screenshot(as_bytes=False) + draw = ImageDraw.Draw(img) + draw.ellipse((x-dot_radius, y-dot_radius, x+dot_radius, y+dot_radius), fill="red") + img.save("current.png") + await asyncio.sleep(0.03) + x += step + y = math.sin(x / width * math.pi * 2) * 50 + 25 if __name__ == "__main__": asyncio.run(main()) diff --git a/notebooks/diorama/draw.py b/notebooks/diorama/draw.py index 3903479c..d4097164 100644 --- a/notebooks/diorama/draw.py +++ b/notebooks/diorama/draw.py @@ -465,7 +465,7 @@ def draw_desktop_screenshot(app_whitelist: List[str] = None, all_windows: List[D _draw_layer(cg_context, first_pass_windows, app_source_rect, app_target_rect) hitboxes.append({ - "hitbox": [0, 0, app_bounds["width"], app_bounds["height"]], + "hitbox": [0, menubar_bounds["height"], app_bounds["width"], menubar_bounds["height"] + app_bounds["height"]], "target": [ app_source_rect.origin.x, app_source_rect.origin.y, @@ -505,6 +505,8 @@ def draw_desktop_screenshot(app_whitelist: List[str] = None, all_windows: List[D elif item["subrole"] == "AXMinimizedWindowDockItem": if not any(window["name"] == item["title"] and window["role"] == "app" and window["owner"] in app_whitelist for window in all_windows): continue + elif item["subrole"] == "AXFolderDockItem": + continue # Preserve unscaled (original) source position and size before any modification hitbox_position = source_position @@ -1031,6 +1033,12 @@ def capture_all_apps(save_to_disk: bool = False, app_whitelist: List[str] = None # DEBUG: Save hitboxes to disk if desktop_screenshot and save_to_disk and output_dir: + desktop_path = os.path.join(output_dir, "desktop.png") + desktop_screenshot.save(desktop_path) + result["desktop_screenshot"] = desktop_path + + logger.info(f"Saved desktop screenshot to {desktop_path}") + if app_whitelist: # Take screenshot without whitelist desktop_screenshot_full, hitboxes_full = draw_desktop_screenshot( @@ -1053,10 +1061,6 @@ def capture_all_apps(save_to_disk: bool = False, app_whitelist: List[str] = None combined.save(side_by_side_path) result["side_by_side_hitboxes"] = side_by_side_path else: - desktop_path = os.path.join(output_dir, "desktop.png") - desktop_screenshot.save(desktop_path) - result["desktop_screenshot"] = desktop_path - # Overlay hitboxes using new function hitbox_img = _draw_hitboxes(desktop_screenshot.copy(), hitboxes, key="hitbox") hitbox_path = os.path.join(output_dir, "hitboxes.png") @@ -1099,9 +1103,8 @@ async def run_capture(): groups.append(group) screenshots = [] for group in groups: - app_whitelist = group + ["Window Server", "Dock"] - print(f"Capturing for apps: {app_whitelist}") - _, img = capture_all_apps(app_whitelist=app_whitelist) + print(f"Capturing for apps: {group}") + _, img = capture_all_apps(app_whitelist=group) if img: screenshots.append((group, img)) if not screenshots: