diff --git a/notebooks/diorama/__init__.py b/notebooks/diorama/__init__.py new file mode 100644 index 00000000..3608c381 --- /dev/null +++ b/notebooks/diorama/__init__.py @@ -0,0 +1,3 @@ +from .diorama import Diorama + +__all__ = ["Diorama"] diff --git a/notebooks/diorama/agent_example.py b/notebooks/diorama/agent_example.py new file mode 100644 index 00000000..04e48865 --- /dev/null +++ b/notebooks/diorama/agent_example.py @@ -0,0 +1,41 @@ +import asyncio +from diorama import Diorama +from pathlib import Path + +from dotenv import load_dotenv +load_dotenv(Path("~/cua/.env.local").expanduser()) + +from agent import AgentLoop, ComputerAgent as Agent, LLM, LLMProvider + +async def main(): + # diorama's are virtual desktops, they allow you to control multiple apps at once + diorama1 = Diorama.create_from_apps("Terminal") + diorama2 = Diorama.create_from_apps("Notes") + diorama3 = Diorama.create_from_apps("Safari") + diorama4 = Diorama.create_from_apps("Calendar") + + + agents = [ + Agent( + computer=diorama1, + model=LLM("openai", "computer-use-preview"), + loop=AgentLoop.OPENAI + ), + Agent(diorama2, LLM("anthropic", "claude-3-7-sonnet-20250219"), AgentLoop.ANTHROPIC), + Agent(diorama3, LLM("openai", "gpt-4.1-nano"), AgentLoop.OMNI), + Agent(diorama4, LLM("oaicompat", "tgi", os.getenv("UITARS_BASE_URL")), AgentLoop.UITARS) + ] + + tasks = [ + "In Terminal, run 'echo Hello World'", + "In Notes, create a new note with the title 'Test' and the content 'This is a test note.'", + "In Safari, go to https://www.google.com", + "In Calendar, create a new event with the title 'Test' and the content 'This is a test event.'" + ] + + async for response in asyncio.gather(*[agent.run(task) for agent, task in zip(agents, tasks)]): + print(response) + + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file diff --git a/notebooks/diorama/diorama.py b/notebooks/diorama/diorama.py index b0234b03..f10e2a84 100644 --- a/notebooks/diorama/diorama.py +++ b/notebooks/diorama/diorama.py @@ -1,11 +1,19 @@ #!/usr/bin/env python3 """Diorama: A virtual desktop manager for macOS""" +import os import asyncio import logging import sys +import io +from typing import Union +from PIL import Image -from draw import capture_all_apps +from draw import capture_all_apps, AppActivationContext, get_frontmost_and_active_app, get_all_windows, get_running_apps + +from diorama_computer import DioramaComputer +from computer_server.handlers.macos import * +from agent import ComputerAgent, LLM, LLMProvider, AgentLoop # simple, nicely formatted logging logging.basicConfig( @@ -16,15 +24,60 @@ logging.basicConfig( ) logger = logging.getLogger("diorama.virtual_desktop") +automation_handler = MacOSAutomationHandler() + +class AgentFactory: + def __init__(self, diorama): + self.diorama = diorama + + def create_agent(self, loop: AgentLoop, model: LLM): + return ComputerAgent( + computer=self.diorama.computer, + loop=loop, + model=model + ) + + def openai(self): + return self.create_agent(AgentLoop.OPENAI, LLM( + provider=LLMProvider.OPENAI, + name="computer-use-preview" + )) + + def anthropic(self): + return self.create_agent(AgentLoop.ANTHROPIC, LLM( + provider=LLMProvider.ANTHROPIC, + )) + + def openai_omni(self, model_name): + return self.create_agent(AgentLoop.OMNI, LLM( + provider=LLMProvider.OPENAI, + name=model_name + )) + + def uitars(self): + return self.create_agent(AgentLoop.UITARS, LLM( + provider=LLMProvider.OAICOMPAT, + name="tgi", + provider_base_url=os.getenv("UITARS_BASE_URL") + )) + class Diorama: _scheduler_queue = None _scheduler_task = None _loop = None _scheduler_started = False + @classmethod + def create_from_apps(cls, *args) -> DioramaComputer: + cls._ensure_scheduler() + return cls(args).computer + def __init__(self, app_list): self.app_list = app_list + self.agent = AgentFactory(self) self.interface = self.Interface(self) + self.computer = DioramaComputer(self) + self.focus_context = None @classmethod def _ensure_scheduler(cls): @@ -43,68 +96,157 @@ class Diorama: args = cmd.get("arguments", {}) future = cmd.get("future") logger.info(f"Processing command: {action} | args={args}") - if action == "screenshot": + + app_whitelist = args.get("app_list", []) + + all_windows = get_all_windows() + running_apps = get_running_apps() + frontmost_app, active_app_to_use, active_app_pid = get_frontmost_and_active_app(all_windows, running_apps, app_whitelist) + focus_context = AppActivationContext(active_app_pid, active_app_to_use, logger) + + with focus_context: try: - app_whitelist = list(args["app_list"]) + ["Window Server", "Dock"] - logger.info(f"Taking screenshot for apps: {app_whitelist}") - result, img = capture_all_apps( - save_to_disk=args.get("save_to_disk", False), - app_whitelist=app_whitelist, - output_dir=args.get("output_dir"), - take_focus=args.get("take_focus", True) - ) - logger.info("Screenshot complete.") - if future: - future.set_result((result, img)) + if action == "screenshot": + app_whitelist = list(args["app_list"]) + ["Window Server", "Dock"] + logger.info(f"Taking screenshot for apps: {app_whitelist}") + result, img = capture_all_apps( + app_whitelist=app_whitelist, + save_to_disk=False, + take_focus=False + ) + logger.info("Screenshot complete.") + if future: + future.set_result((result, img)) + # Mouse actions + elif action in ["left_click", "right_click", "double_click", "move_cursor", "drag_to"]: + x = args.get("x") + y = args.get("y") + duration = args.get("duration", 0.5) + if action == "left_click": + await automation_handler.left_click(x, y) + elif action == "right_click": + await automation_handler.right_click(x, y) + elif action == "double_click": + await automation_handler.double_click(x, y) + elif action == "move_cursor": + await automation_handler.move_cursor(x, y) + elif action == "drag_to": + await automation_handler.drag_to(x, y, duration=duration) + if future: + future.set_result(None) + # Keyboard actions + elif action == "type_text": + text = args.get("text") + await automation_handler.type_text(text) + if future: + future.set_result(None) + elif action == "press_key": + key = args.get("key") + await automation_handler.press_key(key) + if future: + future.set_result(None) + elif action == "hotkey": + keys = args.get("keys", []) + await automation_handler.hotkey(keys) + if future: + future.set_result(None) + elif action == "get_cursor_position": + pos = await automation_handler.get_cursor_position() + if future: + future.set_result(pos) + else: + logger.warning(f"Unknown action: {action}") + if future: + future.set_exception(ValueError(f"Unknown action: {action}")) except Exception as e: - logger.error(f"Exception during screenshot: {e}", exc_info=True) + logger.error(f"Exception during {action}: {e}", exc_info=True) if future: future.set_exception(e) - else: - logger.warning(f"Unknown action: {action}") - if future: - future.set_exception(ValueError(f"Unknown action: {action}")) - @classmethod - def create_from_apps(cls, app_list): - cls._ensure_scheduler() - return cls(app_list) - - class Interface: + class Interface(): def __init__(self, diorama): self._diorama = diorama - self.hitboxes = [] + + self._scene_hitboxes = [] + self._scene_size = None - async def screenshot(self, save_to_disk=False, output_dir=None, take_focus=True): + async def _send_cmd(self, action, arguments=None): Diorama._ensure_scheduler() loop = asyncio.get_event_loop() future = loop.create_future() - logger.info(f"Enqueuing screenshot command for apps: {self._diorama.app_list}") + logger.info(f"Enqueuing {action} command for apps: {self._diorama.app_list}") await Diorama._scheduler_queue.put({ - "action": "screenshot", - "arguments": { - "app_list": self._diorama.app_list, - "save_to_disk": save_to_disk, - "output_dir": output_dir, - "take_focus": take_focus - }, + "action": action, + "arguments": {"app_list": self._diorama.app_list, **(arguments or {})}, "future": future }) - result, img = await future - # Store hitboxes after screenshot - self.hitboxes = result.get("hitboxes", []) - return result, img + return await future + + async def screenshot(self, as_bytes: bool = True) -> Union[bytes, Image]: + result, img = await self._send_cmd("screenshot") + self._scene_hitboxes = result.get("hitboxes", []) + self._scene_size = img.size + + if as_bytes: + # PIL Image to bytes + img_byte_arr = io.BytesIO() + img.save(img_byte_arr, format="PNG") + img_byte_arr = img_byte_arr.getvalue() + return img_byte_arr + else: + return img + + async def left_click(self, x, y): + sx, sy = await self.to_screen_coordinates(x, y) + await self._send_cmd("left_click", {"x": sx, "y": sy}) + + async def right_click(self, x, y): + sx, sy = await self.to_screen_coordinates(x, y) + await self._send_cmd("right_click", {"x": sx, "y": sy}) + + async def double_click(self, x, y): + sx, sy = await self.to_screen_coordinates(x, y) + await self._send_cmd("double_click", {"x": sx, "y": sy}) + + async def move_cursor(self, x, y): + sx, sy = await self.to_screen_coordinates(x, y) + await self._send_cmd("move_cursor", {"x": sx, "y": sy}) + + async def drag_to(self, x, y, duration=0.5): + sx, sy = await self.to_screen_coordinates(x, y) + await self._send_cmd("drag_to", {"x": sx, "y": sy, "duration": duration}) + + async def get_cursor_position(self): + return await self._send_cmd("get_cursor_position") + + async def type_text(self, text): + await self._send_cmd("type_text", {"text": text}) + + async def press_key(self, key): + await self._send_cmd("press_key", {"key": key}) + + async def hotkey(self, *keys): + await self._send_cmd("hotkey", {"keys": list(keys)}) + + async def get_screen_size(self) -> dict[str, int]: + if not self._scene_size: + await self.screenshot() + return { "width": self._scene_size[0], "height": self._scene_size[1] } async def to_screen_coordinates(self, x: float, y: float) -> tuple[float, float]: + """Convert screenshot coordinates to screen coordinates. + + Args: + x: X absolute coordinate in screenshot space + y: Y absolute coordinate in screenshot space + + Returns: + tuple[float, float]: (x, y) absolute coordinates in screen space """ - Convert screenshot-relative coordinates (x, y) to absolute screen coordinates. - Find the first hitbox whose 'hitbox' contains the mapped (abs_x, abs_y). - If none found, return input. - """ - if not self.hitboxes: + if not self._scene_hitboxes: await self.screenshot() # get hitboxes # Try all hitboxes - for h in self.hitboxes: + for h in self._scene_hitboxes: rect = h.get("hitbox") if not rect or len(rect) != 4: continue @@ -119,14 +261,18 @@ class Diorama: return x, y async def to_screenshot_coordinates(self, x: float, y: float) -> tuple[float, float]: + """Convert screen coordinates to screenshot coordinates. + + Args: + x: X absolute coordinate in screen space + y: Y absolute coordinate in screen space + + Returns: + tuple[float, float]: (x, y) absolute coordinates in screenshot space """ - Convert absolute screen coordinates (x, y) to screenshot-relative coordinates (normalized to [0, 1]). - Find the first hitbox whose 'target' contains (x, y). - If none found, return input. - """ - if not self.hitboxes: + if not self._scene_hitboxes: await self.screenshot() # get hitboxes - for h in self.hitboxes: + for h in self._scene_hitboxes: rect = h.get("target") if not rect or len(rect) != 4: continue @@ -144,60 +290,14 @@ async def main(): from draw import capture_all_apps desktop1 = Diorama.create_from_apps(["Discord", "Notes"]) + desktop2 = Diorama.create_from_apps(["Terminal"]) - # Take full screen screenshot (no app whitelist) - result_full, img_full = capture_all_apps() - # Take desktop1 screenshot - result1, img1 = await desktop1.interface.screenshot() + img1 = await desktop1.interface.screenshot(as_bytes=False) + img2 = await desktop2.interface.screenshot(as_bytes=False) - # Pick a sample normalized screenshot coordinate - test_screenshot_coord = (0.5, 0.5) # center - # Convert to screen coordinates using desktop1 (should map to full screenshot) - screen_coord = await desktop1.interface.to_screen_coordinates(*test_screenshot_coord) - # Convert back to screenshot coordinates on desktop1 - screenshot_coord_back = await desktop1.interface.to_screenshot_coordinates(*screen_coord) + img1.save("app_screenshots/desktop1.png") + img2.save("app_screenshots/desktop2.png") - # Draw on full screenshot: the mapped screen coordinate - img_full = img_full.convert("RGBA") - img1 = img1.convert("RGBA") - width_full, height_full = img_full.size - width1, height1 = img1.size - x_screen, y_screen = int(screen_coord[0]), int(screen_coord[1]) - x1, y1 = int(screenshot_coord_back[0] * width1), int(screenshot_coord_back[1] * height1) - - draw_full = ImageDraw.Draw(img_full) - r = 12 - draw_full.ellipse([(x_screen - r, y_screen - r), (x_screen + r, y_screen + r)], fill=(255,0,0,200), outline=(0,0,0,255)) - draw_full.text((x_screen + r, y_screen), "screen coord", fill=(255,0,0,255)) - - draw1 = ImageDraw.Draw(img1) - draw1.ellipse([(x1 - r, y1 - r), (x1 + r, y1 + r)], fill=(0,0,255,200), outline=(0,0,0,255)) - draw1.text((x1 + r, y1), f"screenshot coord", fill=(0,0,255,255)) - - # Create a new image side by side - total_width = img_full.width + img1.width - max_height = max(img_full.height, img1.height) - combined = Image.new("RGBA", (total_width, max_height), (255,255,255,255)) - combined.paste(img_full, (0, 0)) - combined.paste(img1, (img_full.width, 0)) - - # Draw an arrow from the point in img_full to the point in img1 - arrow_draw = ImageDraw.Draw(combined) - start = (x_screen, y_screen) - end = (x1 + img_full.width, y1) - arrow_draw.line([start, end], fill=(0,128,0,255), width=3) - # Arrowhead - def draw_arrowhead(draw, start, end, color, size=15): - import math - angle = math.atan2(end[1] - start[1], end[0] - start[0]) - for a in [math.pi/8, -math.pi/8]: - x = end[0] - size * math.cos(angle + a) - y = end[1] - size * math.sin(angle + a) - draw.line([end, (x, y)], fill=color, width=3) - draw_arrowhead(arrow_draw, start, end, (0,128,0,255)) - - combined.save("coord_mapping_demo.png") - print("Saved coordinate mapping demo to coord_mapping_demo.png") if __name__ == "__main__": asyncio.run(main()) diff --git a/notebooks/diorama/diorama_computer.py b/notebooks/diorama/diorama_computer.py new file mode 100644 index 00000000..1ddad0a7 --- /dev/null +++ b/notebooks/diorama/diorama_computer.py @@ -0,0 +1,27 @@ +import asyncio + +class DioramaComputer: + """ + A minimal Computer-like interface for Diorama, compatible with ComputerAgent. + Implements _initialized, run(), and __aenter__ for agent compatibility. + """ + def __init__(self, diorama): + self.diorama = diorama + self.interface = self.diorama.interface + self.agent = self.diorama.agent + self._initialized = False + + async def __aenter__(self): + # Ensure the event loop is running (for compatibility) + try: + asyncio.get_running_loop() + except RuntimeError: + asyncio.set_event_loop(asyncio.new_event_loop()) + self._initialized = True + return self + + async def run(self): + # This is a stub for compatibility + if not self._initialized: + await self.__aenter__() + return self