removed extra files

This commit is contained in:
Dillon DuPont
2025-05-31 12:40:02 -04:00
parent df9a080d03
commit 5846f0ef08
6 changed files with 0 additions and 1900 deletions

View File

@@ -1,3 +0,0 @@
from .diorama import Diorama
__all__ = ["Diorama"]

View File

@@ -1,68 +0,0 @@
import asyncio
from diorama import Diorama
from pathlib import Path
from dotenv import load_dotenv
load_dotenv(Path("~/cua/.env.local").expanduser())
from agent import AgentLoop, ComputerAgent as Agent, LLM, LLMProvider
from PIL import Image
import rpack
async def make_mosaic(dioramas):
sizes = []
for d in dioramas:
size = await d.interface.get_screen_size()
sizes.append((size['width'], size['height']))
positions = rpack.pack(sizes)
max_x = max(x + w for (x, y), (w, h) in zip(positions, sizes))
max_y = max(y + h for (x, y), (w, h) in zip(positions, sizes))
mosaic = Image.new("RGBA", (max_x, max_y), (30, 30, 30, 255))
draw_positions = positions
return mosaic, draw_positions
async def main():
# diorama's are virtual desktops, they allow you to control multiple apps at once
diorama1 = Diorama.create_from_apps("Safari")
diorama2 = Diorama.create_from_apps("Notes")
diorama3 = Diorama.create_from_apps("Calculator")
diorama4 = Diorama.create_from_apps("Terminal")
# create agents
agents = [
diorama1.agent.openai(),
diorama2.agent.openai(),
diorama3.agent.openai(),modif
diorama4.agent.openai()
]
dioramas = [diorama1, diorama2, diorama3, diorama4]
mosaic, draw_positions = await make_mosaic(dioramas)
mosaic.save(Path("~/cua/notebooks/app_screenshots/mosaic.png").expanduser())
tasks = [
"In Safari, find a cat picture",
"In Notes, make a note named 'Test' and draw an ASCII dog",
"In Calculator, add 2 + 2",
"In Terminal, type 'ls' and press enter"
]
async def run_agent(agent, task, diorama_idx):
diorama = dioramas[diorama_idx]
# start with a screenshot
screenshot = await diorama.interface.screenshot(as_bytes=False)
mosaic.paste(screenshot, draw_positions[diorama_idx])
mosaic.save(Path("~/cua/notebooks/app_screenshots/mosaic.png").expanduser())
async for response in agent.run(task):
print(response)
# update mosaic
screenshot = await diorama.interface.screenshot(as_bytes=False)
mosaic.paste(screenshot, draw_positions[diorama_idx])
mosaic.save(Path("~/cua/notebooks/app_screenshots/mosaic.png").expanduser())
# run agents
await asyncio.gather(*[run_agent(agent, task, idx) for idx, (agent, task) in enumerate(zip(agents, tasks))])
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,392 +0,0 @@
#!/usr/bin/env python3
"""Diorama: A virtual desktop manager for macOS"""
import os
import asyncio
import logging
import sys
import io
from typing import Union
from PIL import Image, ImageDraw
from draw import capture_all_apps, AppActivationContext, get_frontmost_and_active_app, get_all_windows, get_running_apps
from diorama_computer import DioramaComputer
from computer_server.handlers.macos import *
from agent import ComputerAgent, LLM, LLMProvider, AgentLoop
# simple, nicely formatted logging
logging.basicConfig(
level=logging.INFO,
format='[%(asctime)s] [%(levelname)s] %(message)s',
datefmt='%H:%M:%S',
stream=sys.stdout
)
logger = logging.getLogger("diorama.virtual_desktop")
automation_handler = MacOSAutomationHandler()
class AgentFactory:
def __init__(self, diorama):
self.diorama = diorama
def create_agent(self, loop: AgentLoop, model: LLM):
return ComputerAgent(
computer=self.diorama.computer,
loop=loop,
model=model
)
def openai(self):
return self.create_agent(AgentLoop.OPENAI, LLM(
provider=LLMProvider.OPENAI,
name="computer-use-preview"
))
def anthropic(self):
return self.create_agent(AgentLoop.ANTHROPIC, LLM(
provider=LLMProvider.ANTHROPIC,
))
def openai_omni(self, model_name):
return self.create_agent(AgentLoop.OMNI, LLM(
provider=LLMProvider.OPENAI,
name=model_name
))
def uitars(self):
return self.create_agent(AgentLoop.UITARS, LLM(
provider=LLMProvider.OAICOMPAT,
name="tgi",
provider_base_url=os.getenv("UITARS_BASE_URL")
))
class Diorama:
_scheduler_queue = None
_scheduler_task = None
_loop = None
_scheduler_started = False
@classmethod
def create_from_apps(cls, *args) -> DioramaComputer:
cls._ensure_scheduler()
return cls(args).computer
def __init__(self, app_list):
self.app_list = app_list
self.agent = AgentFactory(self)
self.interface = self.Interface(self)
self.computer = DioramaComputer(self)
self.focus_context = None
@classmethod
def _ensure_scheduler(cls):
if not cls._scheduler_started:
logger.info("Starting Diorama scheduler loop…")
cls._scheduler_queue = asyncio.Queue()
cls._loop = asyncio.get_event_loop()
cls._scheduler_task = cls._loop.create_task(cls._scheduler_loop())
cls._scheduler_started = True
@classmethod
async def _scheduler_loop(cls):
while True:
cmd = await cls._scheduler_queue.get()
action = cmd.get("action")
args = cmd.get("arguments", {})
future = cmd.get("future")
logger.info(f"Processing command: {action} | args={args}")
app_whitelist = args.get("app_list", [])
all_windows = get_all_windows()
running_apps = get_running_apps()
frontmost_app, active_app_to_use, active_app_pid = get_frontmost_and_active_app(all_windows, running_apps, app_whitelist)
focus_context = AppActivationContext(active_app_pid, active_app_to_use, logger)
with focus_context:
try:
if action == "screenshot":
app_whitelist = list(args["app_list"])
logger.info(f"Taking screenshot for apps: {app_whitelist}")
result, img = capture_all_apps(
app_whitelist=app_whitelist,
save_to_disk=False,
take_focus=False
)
logger.info("Screenshot complete.")
if future:
future.set_result((result, img))
# Mouse actions
elif action in ["left_click", "right_click", "double_click", "move_cursor", "drag_to"]:
x = args.get("x")
y = args.get("y")
duration = args.get("duration", 0.5)
if action == "left_click":
await automation_handler.left_click(x, y)
elif action == "right_click":
await automation_handler.right_click(x, y)
elif action == "double_click":
await automation_handler.double_click(x, y)
elif action == "move_cursor":
await automation_handler.move_cursor(x, y)
elif action == "drag_to":
await automation_handler.drag_to(x, y, duration=duration)
if future:
future.set_result(None)
# Keyboard actions
elif action == "type_text":
text = args.get("text")
await automation_handler.type_text(text)
if future:
future.set_result(None)
elif action == "press_key":
key = args.get("key")
await automation_handler.press_key(key)
if future:
future.set_result(None)
elif action == "hotkey":
keys = args.get("keys", [])
await automation_handler.hotkey(keys)
if future:
future.set_result(None)
elif action == "get_cursor_position":
pos = await automation_handler.get_cursor_position()
if future:
future.set_result(pos)
else:
logger.warning(f"Unknown action: {action}")
if future:
future.set_exception(ValueError(f"Unknown action: {action}"))
except Exception as e:
logger.error(f"Exception during {action}: {e}", exc_info=True)
if future:
future.set_exception(e)
class Interface():
def __init__(self, diorama):
self._diorama = diorama
self._scene_hitboxes = []
self._scene_size = None
async def _send_cmd(self, action, arguments=None):
Diorama._ensure_scheduler()
loop = asyncio.get_event_loop()
future = loop.create_future()
logger.info(f"Enqueuing {action} command for apps: {self._diorama.app_list}")
await Diorama._scheduler_queue.put({
"action": action,
"arguments": {"app_list": self._diorama.app_list, **(arguments or {})},
"future": future
})
try:
return await future
except asyncio.CancelledError:
logger.warning(f"Command was cancelled: {action}")
return None
async def screenshot(self, as_bytes: bool = True) -> Union[bytes, Image]:
result, img = await self._send_cmd("screenshot")
self._scene_hitboxes = result.get("hitboxes", [])
self._scene_size = img.size
if as_bytes:
# PIL Image to bytes
img_byte_arr = io.BytesIO()
img.save(img_byte_arr, format="PNG")
img_byte_arr = img_byte_arr.getvalue()
return img_byte_arr
else:
return img
async def left_click(self, x, y):
sx, sy = await self.to_screen_coordinates(x, y)
await self._send_cmd("left_click", {"x": sx, "y": sy})
async def right_click(self, x, y):
sx, sy = await self.to_screen_coordinates(x, y)
await self._send_cmd("right_click", {"x": sx, "y": sy})
async def double_click(self, x, y):
sx, sy = await self.to_screen_coordinates(x, y)
await self._send_cmd("double_click", {"x": sx, "y": sy})
async def move_cursor(self, x, y):
sx, sy = await self.to_screen_coordinates(x, y)
await self._send_cmd("move_cursor", {"x": sx, "y": sy})
async def drag_to(self, x, y, duration=0.5):
sx, sy = await self.to_screen_coordinates(x, y)
await self._send_cmd("drag_to", {"x": sx, "y": sy, "duration": duration})
async def get_cursor_position(self):
return await self._send_cmd("get_cursor_position")
async def type_text(self, text):
await self._send_cmd("type_text", {"text": text})
async def press_key(self, key):
await self._send_cmd("press_key", {"key": key})
async def hotkey(self, *keys):
await self._send_cmd("hotkey", {"keys": list(keys)})
async def get_screen_size(self) -> dict[str, int]:
if not self._scene_size:
await self.screenshot()
return { "width": self._scene_size[0], "height": self._scene_size[1] }
async def to_screen_coordinates(self, x: float, y: float) -> tuple[float, float]:
"""Convert screenshot coordinates to screen coordinates.
Args:
x: X absolute coordinate in screenshot space
y: Y absolute coordinate in screenshot space
Returns:
tuple[float, float]: (x, y) absolute coordinates in screen space
"""
if not self._scene_hitboxes:
await self.screenshot() # get hitboxes
# Try all hitboxes
for h in self._scene_hitboxes[::-1]:
rect_from = h.get("hitbox")
rect_to = h.get("target")
if not rect_from or len(rect_from) != 4:
continue
# check if (x, y) is inside rect_from
x0, y0, x1, y1 = rect_from
if x0 <= x <= x1 and y0 <= y <= y1:
logger.info(f"Found hitbox: {h}")
# remap (x, y) to rect_to
tx0, ty0, tx1, ty1 = rect_to
# calculate offset from x0, y0
offset_x = x - x0
offset_y = y - y0
# remap offset to rect_to
tx = tx0 + offset_x
ty = ty0 + offset_y
return tx, ty
return x, y
async def to_screenshot_coordinates(self, x: float, y: float) -> tuple[float, float]:
"""Convert screen coordinates to screenshot coordinates.
Args:
x: X absolute coordinate in screen space
y: Y absolute coordinate in screen space
Returns:
tuple[float, float]: (x, y) absolute coordinates in screenshot space
"""
if not self._scene_hitboxes:
await self.screenshot() # get hitboxes
# Try all hitboxes
for h in self._scene_hitboxes[::-1]:
rect_from = h.get("target")
rect_to = h.get("hitbox")
if not rect_from or len(rect_from) != 4:
continue
# check if (x, y) is inside rect_from
x0, y0, x1, y1 = rect_from
if x0 <= x <= x1 and y0 <= y <= y1:
# remap (x, y) to rect_to
tx0, ty0, tx1, ty1 = rect_to
# calculate offset from x0, y0
offset_x = x - x0
offset_y = y - y0
# remap offset to rect_to
tx = tx0 + offset_x
ty = ty0 + offset_y
return tx, ty
return x, y
import pyautogui
import time
async def main():
desktop1 = Diorama.create_from_apps(["Discord", "Notes"])
desktop2 = Diorama.create_from_apps(["Terminal"])
img1 = await desktop1.interface.screenshot(as_bytes=False)
img2 = await desktop2.interface.screenshot(as_bytes=False)
img1.save("app_screenshots/desktop1.png")
img2.save("app_screenshots/desktop2.png")
# Initialize Diorama desktop
desktop3 = Diorama.create_from_apps("Safari")
screen_size = await desktop3.interface.get_screen_size()
print(screen_size)
# Take initial screenshot
img = await desktop3.interface.screenshot(as_bytes=False)
img.save("app_screenshots/desktop3.png")
# Prepare hitboxes and draw on the single screenshot
hitboxes = desktop3.interface._scene_hitboxes[::-1]
base_img = img.copy()
draw = ImageDraw.Draw(base_img)
for h in hitboxes:
rect = h.get("hitbox")
if not rect or len(rect) != 4:
continue
draw.rectangle(rect, outline="red", width=2)
# Track and draw mouse position in real time (single screenshot size)
last_mouse_pos = None
print("Tracking mouse... Press Ctrl+C to stop.")
try:
while True:
mouse_x, mouse_y = pyautogui.position()
if last_mouse_pos != (mouse_x, mouse_y):
last_mouse_pos = (mouse_x, mouse_y)
# Map to screenshot coordinates
sx, sy = await desktop3.interface.to_screenshot_coordinates(mouse_x, mouse_y)
# Draw on a copy of the screenshot
frame = base_img.copy()
frame_draw = ImageDraw.Draw(frame)
frame_draw.ellipse((sx-5, sy-5, sx+5, sy+5), fill="blue", outline="blue")
# Save the frame
frame.save("app_screenshots/desktop3_mouse.png")
print(f"Mouse at screen ({mouse_x}, {mouse_y}) -> screenshot ({sx:.1f}, {sy:.1f})")
time.sleep(0.05) # Throttle updates to ~20 FPS
except KeyboardInterrupt:
print("Stopped tracking.")
draw.text((rect[0], rect[1]), str(idx), fill="red")
canvas.save("app_screenshots/desktop3_hitboxes.png")
# move mouse in a square spiral around the screen
import math
import random
step = 20 # pixels per move
dot_radius = 10
width = screen_size["width"]
height = screen_size["height"]
x, y = 0, 10
while x < width and y < height:
await desktop3.interface.move_cursor(x, y)
img = await desktop3.interface.screenshot(as_bytes=False)
draw = ImageDraw.Draw(img)
draw.ellipse((x-dot_radius, y-dot_radius, x+dot_radius, y+dot_radius), fill="red")
img.save("current.png")
await asyncio.sleep(0.03)
x += step
y = math.sin(x / width * math.pi * 2) * 50 + 25
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,27 +0,0 @@
import asyncio
class DioramaComputer:
"""
A minimal Computer-like interface for Diorama, compatible with ComputerAgent.
Implements _initialized, run(), and __aenter__ for agent compatibility.
"""
def __init__(self, diorama):
self.diorama = diorama
self.interface = self.diorama.interface
self.agent = self.diorama.agent
self._initialized = False
async def __aenter__(self):
# Ensure the event loop is running (for compatibility)
try:
asyncio.get_running_loop()
except RuntimeError:
asyncio.set_event_loop(asyncio.new_event_loop())
self._initialized = True
return self
async def run(self):
# This is a stub for compatibility
if not self._initialized:
await self.__aenter__()
return self

File diff suppressed because it is too large Load Diff

View File

@@ -1,199 +0,0 @@
#!/usr/bin/env python3
"""
UI Safezone Helper - A utility to get accurate bounds for macOS UI elements
This module provides helper functions to get accurate bounds for macOS UI elements
like the menubar and dock, which are needed for proper screenshot composition.
"""
import sys
import time
from typing import Dict, Any, Optional, Tuple
# Import Objective-C bridge libraries
try:
import AppKit
from ApplicationServices import (
AXUIElementCreateSystemWide,
AXUIElementCreateApplication,
AXUIElementCopyAttributeValue,
AXUIElementCopyAttributeValues,
kAXChildrenAttribute,
kAXRoleAttribute,
kAXTitleAttribute,
kAXPositionAttribute,
kAXSizeAttribute,
kAXErrorSuccess,
AXValueGetType,
kAXValueCGSizeType,
kAXValueCGPointType,
AXUIElementGetTypeID,
AXValueGetValue,
kAXMenuBarAttribute,
)
from AppKit import NSWorkspace, NSRunningApplication
import Foundation
except ImportError:
print("Error: This script requires PyObjC to be installed.")
print("Please install it with: pip install pyobjc")
sys.exit(1)
# Constants for accessibility API
kAXErrorSuccess = 0
kAXRoleAttribute = "AXRole"
kAXSubroleAttribute = "AXSubrole"
kAXTitleAttribute = "AXTitle"
kAXPositionAttribute = "AXPosition"
kAXSizeAttribute = "AXSize"
kAXChildrenAttribute = "AXChildren"
kAXMenuBarAttribute = "AXMenuBar"
def element_attribute(element, attribute):
"""Get an attribute from an accessibility element"""
if attribute == kAXChildrenAttribute:
err, value = AXUIElementCopyAttributeValues(element, attribute, 0, 999, None)
if err == kAXErrorSuccess:
if isinstance(value, Foundation.NSArray):
return list(value)
else:
return value
err, value = AXUIElementCopyAttributeValue(element, attribute, None)
if err == kAXErrorSuccess:
return value
return None
def element_value(element, type):
"""Get a value from an accessibility element"""
err, value = AXValueGetValue(element, type, None)
if err == True:
return value
return None
def get_element_bounds(element):
"""Get the bounds of an accessibility element"""
bounds = {
"x": 0,
"y": 0,
"width": 0,
"height": 0
}
# Get position
position_value = element_attribute(element, kAXPositionAttribute)
if position_value:
position_value = element_value(position_value, kAXValueCGPointType)
if position_value:
bounds["x"] = position_value.x
bounds["y"] = position_value.y
# Get size
size_value = element_attribute(element, kAXSizeAttribute)
if size_value:
size_value = element_value(size_value, kAXValueCGSizeType)
if size_value:
bounds["width"] = size_value.width
bounds["height"] = size_value.height
return bounds
def find_dock_process():
"""Find the Dock process"""
running_apps = NSWorkspace.sharedWorkspace().runningApplications()
for app in running_apps:
if app.localizedName() == "Dock" and app.bundleIdentifier() == "com.apple.dock":
return app.processIdentifier()
return None
def get_menubar_bounds():
"""Get the bounds of the macOS menubar
Returns:
Dictionary with x, y, width, height of the menubar
"""
# Get the system-wide accessibility element
system_element = AXUIElementCreateSystemWide()
# Try to find the menubar
menubar = element_attribute(system_element, kAXMenuBarAttribute)
if menubar is None:
# If we can't get it directly, try through the frontmost app
frontmost_app = NSWorkspace.sharedWorkspace().frontmostApplication()
if frontmost_app:
app_pid = frontmost_app.processIdentifier()
app_element = AXUIElementCreateApplication(app_pid)
menubar = element_attribute(app_element, kAXMenuBarAttribute)
if menubar is None:
print("Error: Could not get menubar")
# Return default menubar bounds as fallback
return {"x": 0, "y": 0, "width": 1800, "height": 24}
# Get menubar bounds
return get_element_bounds(menubar)
def get_dock_bounds():
"""Get the bounds of the macOS Dock
Returns:
Dictionary with x, y, width, height of the Dock
"""
dock_pid = find_dock_process()
if dock_pid is None:
print("Error: Could not find Dock process")
# Return empty bounds as fallback
return {"x": 0, "y": 0, "width": 0, "height": 0}
# Create an accessibility element for the Dock
dock_element = AXUIElementCreateApplication(dock_pid)
if dock_element is None:
print(f"Error: Could not create accessibility element for Dock (PID {dock_pid})")
return {"x": 0, "y": 0, "width": 0, "height": 0}
# Get the Dock's children
children = element_attribute(dock_element, kAXChildrenAttribute)
if not children or len(children) == 0:
print("Error: Could not get Dock children")
return {"x": 0, "y": 0, "width": 0, "height": 0}
# Find the Dock's list (first child is usually the main dock list)
dock_list = None
for child in children:
role = element_attribute(child, kAXRoleAttribute)
if role == "AXList":
dock_list = child
break
if dock_list is None:
print("Error: Could not find Dock list")
return {"x": 0, "y": 0, "width": 0, "height": 0}
# Get the bounds of the dock list
return get_element_bounds(dock_list)
def get_ui_element_bounds():
"""Get the bounds of important UI elements like menubar and dock
Returns:
Dictionary with menubar and dock bounds
"""
menubar_bounds = get_menubar_bounds()
dock_bounds = get_dock_bounds()
return {
"menubar": menubar_bounds,
"dock": dock_bounds
}
if __name__ == "__main__":
# Example usage
bounds = get_ui_element_bounds()
print("Menubar bounds:", bounds["menubar"])
print("Dock bounds:", bounds["dock"])