mirror of
https://github.com/trycua/computer.git
synced 2026-01-03 12:00:00 -06:00
added diorama to server side
This commit is contained in:
3
libs/computer-server/computer_server/diorama/__init__.py
Normal file
3
libs/computer-server/computer_server/diorama/__init__.py
Normal file
@@ -0,0 +1,3 @@
|
||||
from .diorama import Diorama
|
||||
|
||||
__all__ = ["Diorama"]
|
||||
4
libs/computer-server/computer_server/diorama/base.py
Normal file
4
libs/computer-server/computer_server/diorama/base.py
Normal file
@@ -0,0 +1,4 @@
|
||||
class BaseDioramaHandler:
|
||||
"""Base Diorama handler for unsupported OSes."""
|
||||
async def diorama_cmd(self, action: str, arguments: dict = None) -> dict:
|
||||
return {"success": False, "error": "Diorama is not supported on this OS yet."}
|
||||
355
libs/computer-server/computer_server/diorama/diorama.py
Normal file
355
libs/computer-server/computer_server/diorama/diorama.py
Normal file
@@ -0,0 +1,355 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Diorama: A virtual desktop manager for macOS"""
|
||||
|
||||
import os
|
||||
import asyncio
|
||||
import logging
|
||||
import sys
|
||||
import io
|
||||
from typing import Union
|
||||
from PIL import Image, ImageDraw
|
||||
|
||||
from draw import capture_all_apps, AppActivationContext, get_frontmost_and_active_app, get_all_windows, get_running_apps
|
||||
|
||||
from diorama_computer import DioramaComputer
|
||||
from computer_server.handlers.macos import *
|
||||
|
||||
# simple, nicely formatted logging
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format='[%(asctime)s] [%(levelname)s] %(message)s',
|
||||
datefmt='%H:%M:%S',
|
||||
stream=sys.stdout
|
||||
)
|
||||
logger = logging.getLogger("diorama.virtual_desktop")
|
||||
|
||||
automation_handler = MacOSAutomationHandler()
|
||||
|
||||
class Diorama:
|
||||
_scheduler_queue = None
|
||||
_scheduler_task = None
|
||||
_loop = None
|
||||
_scheduler_started = False
|
||||
|
||||
@classmethod
|
||||
def create_from_apps(cls, *args) -> DioramaComputer:
|
||||
cls._ensure_scheduler()
|
||||
return cls(args).computer
|
||||
|
||||
def __init__(self, app_list):
|
||||
self.app_list = app_list
|
||||
self.interface = self.Interface(self)
|
||||
self.computer = DioramaComputer(self)
|
||||
self.focus_context = None
|
||||
|
||||
@classmethod
|
||||
def _ensure_scheduler(cls):
|
||||
if not cls._scheduler_started:
|
||||
logger.info("Starting Diorama scheduler loop…")
|
||||
cls._scheduler_queue = asyncio.Queue()
|
||||
cls._loop = asyncio.get_event_loop()
|
||||
cls._scheduler_task = cls._loop.create_task(cls._scheduler_loop())
|
||||
cls._scheduler_started = True
|
||||
|
||||
@classmethod
|
||||
async def _scheduler_loop(cls):
|
||||
while True:
|
||||
cmd = await cls._scheduler_queue.get()
|
||||
action = cmd.get("action")
|
||||
args = cmd.get("arguments", {})
|
||||
future = cmd.get("future")
|
||||
logger.info(f"Processing command: {action} | args={args}")
|
||||
|
||||
app_whitelist = args.get("app_list", [])
|
||||
|
||||
all_windows = get_all_windows()
|
||||
running_apps = get_running_apps()
|
||||
frontmost_app, active_app_to_use, active_app_pid = get_frontmost_and_active_app(all_windows, running_apps, app_whitelist)
|
||||
focus_context = AppActivationContext(active_app_pid, active_app_to_use, logger)
|
||||
|
||||
with focus_context:
|
||||
try:
|
||||
if action == "screenshot":
|
||||
app_whitelist = list(args["app_list"])
|
||||
logger.info(f"Taking screenshot for apps: {app_whitelist}")
|
||||
result, img = capture_all_apps(
|
||||
app_whitelist=app_whitelist,
|
||||
save_to_disk=False,
|
||||
take_focus=False
|
||||
)
|
||||
logger.info("Screenshot complete.")
|
||||
if future:
|
||||
future.set_result((result, img))
|
||||
# Mouse actions
|
||||
elif action in ["left_click", "right_click", "double_click", "move_cursor", "drag_to"]:
|
||||
x = args.get("x")
|
||||
y = args.get("y")
|
||||
duration = args.get("duration", 0.5)
|
||||
if action == "left_click":
|
||||
await automation_handler.left_click(x, y)
|
||||
elif action == "right_click":
|
||||
await automation_handler.right_click(x, y)
|
||||
elif action == "double_click":
|
||||
await automation_handler.double_click(x, y)
|
||||
elif action == "move_cursor":
|
||||
await automation_handler.move_cursor(x, y)
|
||||
elif action == "drag_to":
|
||||
await automation_handler.drag_to(x, y, duration=duration)
|
||||
if future:
|
||||
future.set_result(None)
|
||||
# Keyboard actions
|
||||
elif action == "type_text":
|
||||
text = args.get("text")
|
||||
await automation_handler.type_text(text)
|
||||
if future:
|
||||
future.set_result(None)
|
||||
elif action == "press_key":
|
||||
key = args.get("key")
|
||||
await automation_handler.press_key(key)
|
||||
if future:
|
||||
future.set_result(None)
|
||||
elif action == "hotkey":
|
||||
keys = args.get("keys", [])
|
||||
await automation_handler.hotkey(keys)
|
||||
if future:
|
||||
future.set_result(None)
|
||||
elif action == "get_cursor_position":
|
||||
pos = await automation_handler.get_cursor_position()
|
||||
if future:
|
||||
future.set_result(pos)
|
||||
else:
|
||||
logger.warning(f"Unknown action: {action}")
|
||||
if future:
|
||||
future.set_exception(ValueError(f"Unknown action: {action}"))
|
||||
except Exception as e:
|
||||
logger.error(f"Exception during {action}: {e}", exc_info=True)
|
||||
if future:
|
||||
future.set_exception(e)
|
||||
|
||||
class Interface():
|
||||
def __init__(self, diorama):
|
||||
self._diorama = diorama
|
||||
|
||||
self._scene_hitboxes = []
|
||||
self._scene_size = None
|
||||
|
||||
async def _send_cmd(self, action, arguments=None):
|
||||
Diorama._ensure_scheduler()
|
||||
loop = asyncio.get_event_loop()
|
||||
future = loop.create_future()
|
||||
logger.info(f"Enqueuing {action} command for apps: {self._diorama.app_list}")
|
||||
await Diorama._scheduler_queue.put({
|
||||
"action": action,
|
||||
"arguments": {"app_list": self._diorama.app_list, **(arguments or {})},
|
||||
"future": future
|
||||
})
|
||||
try:
|
||||
return await future
|
||||
except asyncio.CancelledError:
|
||||
logger.warning(f"Command was cancelled: {action}")
|
||||
return None
|
||||
|
||||
async def screenshot(self, as_bytes: bool = True) -> Union[bytes, Image]:
|
||||
result, img = await self._send_cmd("screenshot")
|
||||
self._scene_hitboxes = result.get("hitboxes", [])
|
||||
self._scene_size = img.size
|
||||
|
||||
if as_bytes:
|
||||
# PIL Image to bytes
|
||||
img_byte_arr = io.BytesIO()
|
||||
img.save(img_byte_arr, format="PNG")
|
||||
img_byte_arr = img_byte_arr.getvalue()
|
||||
return img_byte_arr
|
||||
else:
|
||||
return img
|
||||
|
||||
async def left_click(self, x, y):
|
||||
sx, sy = await self.to_screen_coordinates(x, y)
|
||||
await self._send_cmd("left_click", {"x": sx, "y": sy})
|
||||
|
||||
async def right_click(self, x, y):
|
||||
sx, sy = await self.to_screen_coordinates(x, y)
|
||||
await self._send_cmd("right_click", {"x": sx, "y": sy})
|
||||
|
||||
async def double_click(self, x, y):
|
||||
sx, sy = await self.to_screen_coordinates(x, y)
|
||||
await self._send_cmd("double_click", {"x": sx, "y": sy})
|
||||
|
||||
async def move_cursor(self, x, y):
|
||||
sx, sy = await self.to_screen_coordinates(x, y)
|
||||
await self._send_cmd("move_cursor", {"x": sx, "y": sy})
|
||||
|
||||
async def drag_to(self, x, y, duration=0.5):
|
||||
sx, sy = await self.to_screen_coordinates(x, y)
|
||||
await self._send_cmd("drag_to", {"x": sx, "y": sy, "duration": duration})
|
||||
|
||||
async def get_cursor_position(self):
|
||||
return await self._send_cmd("get_cursor_position")
|
||||
|
||||
async def type_text(self, text):
|
||||
await self._send_cmd("type_text", {"text": text})
|
||||
|
||||
async def press_key(self, key):
|
||||
await self._send_cmd("press_key", {"key": key})
|
||||
|
||||
async def hotkey(self, *keys):
|
||||
await self._send_cmd("hotkey", {"keys": list(keys)})
|
||||
|
||||
async def get_screen_size(self) -> dict[str, int]:
|
||||
if not self._scene_size:
|
||||
await self.screenshot()
|
||||
return { "width": self._scene_size[0], "height": self._scene_size[1] }
|
||||
|
||||
async def to_screen_coordinates(self, x: float, y: float) -> tuple[float, float]:
|
||||
"""Convert screenshot coordinates to screen coordinates.
|
||||
|
||||
Args:
|
||||
x: X absolute coordinate in screenshot space
|
||||
y: Y absolute coordinate in screenshot space
|
||||
|
||||
Returns:
|
||||
tuple[float, float]: (x, y) absolute coordinates in screen space
|
||||
"""
|
||||
if not self._scene_hitboxes:
|
||||
await self.screenshot() # get hitboxes
|
||||
# Try all hitboxes
|
||||
for h in self._scene_hitboxes[::-1]:
|
||||
rect_from = h.get("hitbox")
|
||||
rect_to = h.get("target")
|
||||
if not rect_from or len(rect_from) != 4:
|
||||
continue
|
||||
|
||||
# check if (x, y) is inside rect_from
|
||||
x0, y0, x1, y1 = rect_from
|
||||
if x0 <= x <= x1 and y0 <= y <= y1:
|
||||
logger.info(f"Found hitbox: {h}")
|
||||
# remap (x, y) to rect_to
|
||||
tx0, ty0, tx1, ty1 = rect_to
|
||||
|
||||
# calculate offset from x0, y0
|
||||
offset_x = x - x0
|
||||
offset_y = y - y0
|
||||
|
||||
# remap offset to rect_to
|
||||
tx = tx0 + offset_x
|
||||
ty = ty0 + offset_y
|
||||
|
||||
return tx, ty
|
||||
return x, y
|
||||
|
||||
async def to_screenshot_coordinates(self, x: float, y: float) -> tuple[float, float]:
|
||||
"""Convert screen coordinates to screenshot coordinates.
|
||||
|
||||
Args:
|
||||
x: X absolute coordinate in screen space
|
||||
y: Y absolute coordinate in screen space
|
||||
|
||||
Returns:
|
||||
tuple[float, float]: (x, y) absolute coordinates in screenshot space
|
||||
"""
|
||||
if not self._scene_hitboxes:
|
||||
await self.screenshot() # get hitboxes
|
||||
# Try all hitboxes
|
||||
for h in self._scene_hitboxes[::-1]:
|
||||
rect_from = h.get("target")
|
||||
rect_to = h.get("hitbox")
|
||||
if not rect_from or len(rect_from) != 4:
|
||||
continue
|
||||
|
||||
# check if (x, y) is inside rect_from
|
||||
x0, y0, x1, y1 = rect_from
|
||||
if x0 <= x <= x1 and y0 <= y <= y1:
|
||||
# remap (x, y) to rect_to
|
||||
tx0, ty0, tx1, ty1 = rect_to
|
||||
|
||||
# calculate offset from x0, y0
|
||||
offset_x = x - x0
|
||||
offset_y = y - y0
|
||||
|
||||
# remap offset to rect_to
|
||||
tx = tx0 + offset_x
|
||||
ty = ty0 + offset_y
|
||||
|
||||
return tx, ty
|
||||
return x, y
|
||||
|
||||
import pyautogui
|
||||
import time
|
||||
|
||||
async def main():
|
||||
desktop1 = Diorama.create_from_apps(["Discord", "Notes"])
|
||||
desktop2 = Diorama.create_from_apps(["Terminal"])
|
||||
|
||||
img1 = await desktop1.interface.screenshot(as_bytes=False)
|
||||
img2 = await desktop2.interface.screenshot(as_bytes=False)
|
||||
|
||||
img1.save("app_screenshots/desktop1.png")
|
||||
img2.save("app_screenshots/desktop2.png")
|
||||
# Initialize Diorama desktop
|
||||
desktop3 = Diorama.create_from_apps("Safari")
|
||||
screen_size = await desktop3.interface.get_screen_size()
|
||||
print(screen_size)
|
||||
|
||||
# Take initial screenshot
|
||||
img = await desktop3.interface.screenshot(as_bytes=False)
|
||||
img.save("app_screenshots/desktop3.png")
|
||||
|
||||
# Prepare hitboxes and draw on the single screenshot
|
||||
hitboxes = desktop3.interface._scene_hitboxes[::-1]
|
||||
base_img = img.copy()
|
||||
draw = ImageDraw.Draw(base_img)
|
||||
for h in hitboxes:
|
||||
rect = h.get("hitbox")
|
||||
if not rect or len(rect) != 4:
|
||||
continue
|
||||
draw.rectangle(rect, outline="red", width=2)
|
||||
|
||||
# Track and draw mouse position in real time (single screenshot size)
|
||||
last_mouse_pos = None
|
||||
print("Tracking mouse... Press Ctrl+C to stop.")
|
||||
try:
|
||||
while True:
|
||||
mouse_x, mouse_y = pyautogui.position()
|
||||
if last_mouse_pos != (mouse_x, mouse_y):
|
||||
last_mouse_pos = (mouse_x, mouse_y)
|
||||
# Map to screenshot coordinates
|
||||
sx, sy = await desktop3.interface.to_screenshot_coordinates(mouse_x, mouse_y)
|
||||
# Draw on a copy of the screenshot
|
||||
frame = base_img.copy()
|
||||
frame_draw = ImageDraw.Draw(frame)
|
||||
frame_draw.ellipse((sx-5, sy-5, sx+5, sy+5), fill="blue", outline="blue")
|
||||
# Save the frame
|
||||
frame.save("app_screenshots/desktop3_mouse.png")
|
||||
print(f"Mouse at screen ({mouse_x}, {mouse_y}) -> screenshot ({sx:.1f}, {sy:.1f})")
|
||||
time.sleep(0.05) # Throttle updates to ~20 FPS
|
||||
except KeyboardInterrupt:
|
||||
print("Stopped tracking.")
|
||||
|
||||
draw.text((rect[0], rect[1]), str(idx), fill="red")
|
||||
|
||||
canvas.save("app_screenshots/desktop3_hitboxes.png")
|
||||
|
||||
|
||||
|
||||
# move mouse in a square spiral around the screen
|
||||
import math
|
||||
import random
|
||||
|
||||
step = 20 # pixels per move
|
||||
dot_radius = 10
|
||||
width = screen_size["width"]
|
||||
height = screen_size["height"]
|
||||
x, y = 0, 10
|
||||
|
||||
while x < width and y < height:
|
||||
await desktop3.interface.move_cursor(x, y)
|
||||
img = await desktop3.interface.screenshot(as_bytes=False)
|
||||
draw = ImageDraw.Draw(img)
|
||||
draw.ellipse((x-dot_radius, y-dot_radius, x+dot_radius, y+dot_radius), fill="red")
|
||||
img.save("current.png")
|
||||
await asyncio.sleep(0.03)
|
||||
x += step
|
||||
y = math.sin(x / width * math.pi * 2) * 50 + 25
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
@@ -0,0 +1,26 @@
|
||||
import asyncio
|
||||
|
||||
class DioramaComputer:
|
||||
"""
|
||||
A minimal Computer-like interface for Diorama, compatible with ComputerAgent.
|
||||
Implements _initialized, run(), and __aenter__ for agent compatibility.
|
||||
"""
|
||||
def __init__(self, diorama):
|
||||
self.diorama = diorama
|
||||
self.interface = self.diorama.interface
|
||||
self._initialized = False
|
||||
|
||||
async def __aenter__(self):
|
||||
# Ensure the event loop is running (for compatibility)
|
||||
try:
|
||||
asyncio.get_running_loop()
|
||||
except RuntimeError:
|
||||
asyncio.set_event_loop(asyncio.new_event_loop())
|
||||
self._initialized = True
|
||||
return self
|
||||
|
||||
async def run(self):
|
||||
# This is a stub for compatibility
|
||||
if not self._initialized:
|
||||
await self.__aenter__()
|
||||
return self
|
||||
1211
libs/computer-server/computer_server/diorama/draw.py
Normal file
1211
libs/computer-server/computer_server/diorama/draw.py
Normal file
File diff suppressed because it is too large
Load Diff
29
libs/computer-server/computer_server/diorama/macos.py
Normal file
29
libs/computer-server/computer_server/diorama/macos.py
Normal file
@@ -0,0 +1,29 @@
|
||||
import platform
|
||||
import sys
|
||||
import platform
|
||||
import inspect
|
||||
from .diorama import Diorama
|
||||
from .base import BaseDioramaHandler
|
||||
|
||||
class MacOSDioramaHandler(BaseDioramaHandler):
|
||||
"""Handler for Diorama commands on macOS, using local diorama module."""
|
||||
async def diorama_cmd(self, action: str, arguments: dict = None) -> dict:
|
||||
if platform.system().lower() != "darwin":
|
||||
return {"success": False, "error": "Diorama is only supported on macOS."}
|
||||
try:
|
||||
app_list = arguments.get("app_list") if arguments else None
|
||||
if not app_list:
|
||||
return {"success": False, "error": "Missing 'app_list' in arguments"}
|
||||
diorama = Diorama(app_list)
|
||||
interface = diorama.interface
|
||||
if not hasattr(interface, action):
|
||||
return {"success": False, "error": f"Unknown diorama action: {action}"}
|
||||
method = getattr(interface, action)
|
||||
if inspect.iscoroutinefunction(method):
|
||||
result = await method(**(arguments or {}))
|
||||
else:
|
||||
result = method(**(arguments or {}))
|
||||
return {"success": True, "result": result}
|
||||
except Exception as e:
|
||||
import traceback
|
||||
return {"success": False, "error": str(e), "trace": traceback.format_exc()}
|
||||
199
libs/computer-server/computer_server/diorama/safezone.py
Normal file
199
libs/computer-server/computer_server/diorama/safezone.py
Normal file
@@ -0,0 +1,199 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
UI Safezone Helper - A utility to get accurate bounds for macOS UI elements
|
||||
|
||||
This module provides helper functions to get accurate bounds for macOS UI elements
|
||||
like the menubar and dock, which are needed for proper screenshot composition.
|
||||
"""
|
||||
|
||||
import sys
|
||||
import time
|
||||
from typing import Dict, Any, Optional, Tuple
|
||||
|
||||
# Import Objective-C bridge libraries
|
||||
try:
|
||||
import AppKit
|
||||
from ApplicationServices import (
|
||||
AXUIElementCreateSystemWide,
|
||||
AXUIElementCreateApplication,
|
||||
AXUIElementCopyAttributeValue,
|
||||
AXUIElementCopyAttributeValues,
|
||||
kAXChildrenAttribute,
|
||||
kAXRoleAttribute,
|
||||
kAXTitleAttribute,
|
||||
kAXPositionAttribute,
|
||||
kAXSizeAttribute,
|
||||
kAXErrorSuccess,
|
||||
AXValueGetType,
|
||||
kAXValueCGSizeType,
|
||||
kAXValueCGPointType,
|
||||
AXUIElementGetTypeID,
|
||||
AXValueGetValue,
|
||||
kAXMenuBarAttribute,
|
||||
)
|
||||
from AppKit import NSWorkspace, NSRunningApplication
|
||||
import Foundation
|
||||
except ImportError:
|
||||
print("Error: This script requires PyObjC to be installed.")
|
||||
print("Please install it with: pip install pyobjc")
|
||||
sys.exit(1)
|
||||
|
||||
# Constants for accessibility API
|
||||
kAXErrorSuccess = 0
|
||||
kAXRoleAttribute = "AXRole"
|
||||
kAXSubroleAttribute = "AXSubrole"
|
||||
kAXTitleAttribute = "AXTitle"
|
||||
kAXPositionAttribute = "AXPosition"
|
||||
kAXSizeAttribute = "AXSize"
|
||||
kAXChildrenAttribute = "AXChildren"
|
||||
kAXMenuBarAttribute = "AXMenuBar"
|
||||
|
||||
|
||||
def element_attribute(element, attribute):
|
||||
"""Get an attribute from an accessibility element"""
|
||||
if attribute == kAXChildrenAttribute:
|
||||
err, value = AXUIElementCopyAttributeValues(element, attribute, 0, 999, None)
|
||||
if err == kAXErrorSuccess:
|
||||
if isinstance(value, Foundation.NSArray):
|
||||
return list(value)
|
||||
else:
|
||||
return value
|
||||
err, value = AXUIElementCopyAttributeValue(element, attribute, None)
|
||||
if err == kAXErrorSuccess:
|
||||
return value
|
||||
return None
|
||||
|
||||
|
||||
def element_value(element, type):
|
||||
"""Get a value from an accessibility element"""
|
||||
err, value = AXValueGetValue(element, type, None)
|
||||
if err == True:
|
||||
return value
|
||||
return None
|
||||
|
||||
|
||||
def get_element_bounds(element):
|
||||
"""Get the bounds of an accessibility element"""
|
||||
bounds = {
|
||||
"x": 0,
|
||||
"y": 0,
|
||||
"width": 0,
|
||||
"height": 0
|
||||
}
|
||||
|
||||
# Get position
|
||||
position_value = element_attribute(element, kAXPositionAttribute)
|
||||
if position_value:
|
||||
position_value = element_value(position_value, kAXValueCGPointType)
|
||||
if position_value:
|
||||
bounds["x"] = position_value.x
|
||||
bounds["y"] = position_value.y
|
||||
|
||||
# Get size
|
||||
size_value = element_attribute(element, kAXSizeAttribute)
|
||||
if size_value:
|
||||
size_value = element_value(size_value, kAXValueCGSizeType)
|
||||
if size_value:
|
||||
bounds["width"] = size_value.width
|
||||
bounds["height"] = size_value.height
|
||||
|
||||
return bounds
|
||||
|
||||
|
||||
def find_dock_process():
|
||||
"""Find the Dock process"""
|
||||
running_apps = NSWorkspace.sharedWorkspace().runningApplications()
|
||||
for app in running_apps:
|
||||
if app.localizedName() == "Dock" and app.bundleIdentifier() == "com.apple.dock":
|
||||
return app.processIdentifier()
|
||||
return None
|
||||
|
||||
|
||||
def get_menubar_bounds():
|
||||
"""Get the bounds of the macOS menubar
|
||||
|
||||
Returns:
|
||||
Dictionary with x, y, width, height of the menubar
|
||||
"""
|
||||
# Get the system-wide accessibility element
|
||||
system_element = AXUIElementCreateSystemWide()
|
||||
|
||||
# Try to find the menubar
|
||||
menubar = element_attribute(system_element, kAXMenuBarAttribute)
|
||||
if menubar is None:
|
||||
# If we can't get it directly, try through the frontmost app
|
||||
frontmost_app = NSWorkspace.sharedWorkspace().frontmostApplication()
|
||||
if frontmost_app:
|
||||
app_pid = frontmost_app.processIdentifier()
|
||||
app_element = AXUIElementCreateApplication(app_pid)
|
||||
menubar = element_attribute(app_element, kAXMenuBarAttribute)
|
||||
|
||||
if menubar is None:
|
||||
print("Error: Could not get menubar")
|
||||
# Return default menubar bounds as fallback
|
||||
return {"x": 0, "y": 0, "width": 1800, "height": 24}
|
||||
|
||||
# Get menubar bounds
|
||||
return get_element_bounds(menubar)
|
||||
|
||||
|
||||
def get_dock_bounds():
|
||||
"""Get the bounds of the macOS Dock
|
||||
|
||||
Returns:
|
||||
Dictionary with x, y, width, height of the Dock
|
||||
"""
|
||||
dock_pid = find_dock_process()
|
||||
if dock_pid is None:
|
||||
print("Error: Could not find Dock process")
|
||||
# Return empty bounds as fallback
|
||||
return {"x": 0, "y": 0, "width": 0, "height": 0}
|
||||
|
||||
# Create an accessibility element for the Dock
|
||||
dock_element = AXUIElementCreateApplication(dock_pid)
|
||||
if dock_element is None:
|
||||
print(f"Error: Could not create accessibility element for Dock (PID {dock_pid})")
|
||||
return {"x": 0, "y": 0, "width": 0, "height": 0}
|
||||
|
||||
# Get the Dock's children
|
||||
children = element_attribute(dock_element, kAXChildrenAttribute)
|
||||
if not children or len(children) == 0:
|
||||
print("Error: Could not get Dock children")
|
||||
return {"x": 0, "y": 0, "width": 0, "height": 0}
|
||||
|
||||
# Find the Dock's list (first child is usually the main dock list)
|
||||
dock_list = None
|
||||
for child in children:
|
||||
role = element_attribute(child, kAXRoleAttribute)
|
||||
if role == "AXList":
|
||||
dock_list = child
|
||||
break
|
||||
|
||||
if dock_list is None:
|
||||
print("Error: Could not find Dock list")
|
||||
return {"x": 0, "y": 0, "width": 0, "height": 0}
|
||||
|
||||
# Get the bounds of the dock list
|
||||
return get_element_bounds(dock_list)
|
||||
|
||||
|
||||
def get_ui_element_bounds():
|
||||
"""Get the bounds of important UI elements like menubar and dock
|
||||
|
||||
Returns:
|
||||
Dictionary with menubar and dock bounds
|
||||
"""
|
||||
menubar_bounds = get_menubar_bounds()
|
||||
dock_bounds = get_dock_bounds()
|
||||
|
||||
return {
|
||||
"menubar": menubar_bounds,
|
||||
"dock": dock_bounds
|
||||
}
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Example usage
|
||||
bounds = get_ui_element_bounds()
|
||||
print("Menubar bounds:", bounds["menubar"])
|
||||
print("Dock bounds:", bounds["dock"])
|
||||
@@ -2,11 +2,13 @@ import platform
|
||||
import subprocess
|
||||
from typing import Tuple, Type
|
||||
from .base import BaseAccessibilityHandler, BaseAutomationHandler
|
||||
from computer_server.diorama.base import BaseDioramaHandler
|
||||
|
||||
# Conditionally import platform-specific handlers
|
||||
system = platform.system().lower()
|
||||
if system == 'darwin':
|
||||
from .macos import MacOSAccessibilityHandler, MacOSAutomationHandler
|
||||
from computer_server.diorama.macos import MacOSDioramaHandler
|
||||
elif system == 'linux':
|
||||
from .linux import LinuxAccessibilityHandler, LinuxAutomationHandler
|
||||
|
||||
@@ -38,13 +40,13 @@ class HandlerFactory:
|
||||
raise RuntimeError(f"Failed to determine current OS: {str(e)}")
|
||||
|
||||
@staticmethod
|
||||
def create_handlers() -> Tuple[BaseAccessibilityHandler, BaseAutomationHandler]:
|
||||
def create_handlers() -> Tuple[BaseAccessibilityHandler, BaseAutomationHandler, BaseDioramaHandler]:
|
||||
"""Create and return appropriate handlers for the current OS.
|
||||
|
||||
Returns:
|
||||
Tuple[BaseAccessibilityHandler, BaseAutomationHandler]: A tuple containing
|
||||
the appropriate accessibility and automation handlers for the current OS.
|
||||
|
||||
Tuple[BaseAccessibilityHandler, BaseAutomationHandler, BaseDioramaHandler]: A tuple containing
|
||||
the appropriate accessibility, automation, and diorama handlers for the current OS.
|
||||
|
||||
Raises:
|
||||
NotImplementedError: If the current OS is not supported
|
||||
RuntimeError: If unable to determine the current OS
|
||||
@@ -52,8 +54,8 @@ class HandlerFactory:
|
||||
os_type = HandlerFactory._get_current_os()
|
||||
|
||||
if os_type == 'darwin':
|
||||
return MacOSAccessibilityHandler(), MacOSAutomationHandler()
|
||||
elif os_type == 'linux':
|
||||
return LinuxAccessibilityHandler(), LinuxAutomationHandler()
|
||||
return MacOSAccessibilityHandler(), MacOSAutomationHandler(), MacOSDioramaHandler()
|
||||
else:
|
||||
return LinuxAccessibilityHandler(), LinuxAutomationHandler(), BaseDioramaHandler()
|
||||
|
||||
raise NotImplementedError(f"OS '{os_type}' is not supported")
|
||||
@@ -31,7 +31,7 @@ class ConnectionManager:
|
||||
def __init__(self):
|
||||
self.active_connections: List[WebSocket] = []
|
||||
# Create OS-specific handlers
|
||||
self.accessibility_handler, self.automation_handler = HandlerFactory.create_handlers()
|
||||
self.accessibility_handler, self.automation_handler, self.diorama_handler = HandlerFactory.create_handlers()
|
||||
|
||||
async def connect(self, websocket: WebSocket):
|
||||
await websocket.accept()
|
||||
@@ -72,6 +72,7 @@ async def websocket_endpoint(websocket: WebSocket):
|
||||
"copy_to_clipboard": manager.automation_handler.copy_to_clipboard,
|
||||
"set_clipboard": manager.automation_handler.set_clipboard,
|
||||
"run_command": manager.automation_handler.run_command,
|
||||
"diorama_cmd": manager.diorama_handler.diorama_cmd,
|
||||
}
|
||||
|
||||
try:
|
||||
|
||||
@@ -21,6 +21,19 @@ OSType = Literal["macos", "linux", "windows"]
|
||||
class Computer:
|
||||
"""Computer is the main class for interacting with the computer."""
|
||||
|
||||
def create_desktop_from_apps(self, apps):
|
||||
"""
|
||||
Create a virtual desktop from a list of app names, returning a DioramaComputer
|
||||
that proxies Diorama.Interface but uses diorama_cmds via the computer interface.
|
||||
|
||||
Args:
|
||||
apps (list[str]): List of application names to include in the desktop.
|
||||
Returns:
|
||||
DioramaComputer: A proxy object with the Diorama interface, but using diorama_cmds.
|
||||
"""
|
||||
from .diorama_computer import DioramaComputer
|
||||
return DioramaComputer(self, apps)
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
display: Union[Display, Dict[str, int], str] = "1024x768",
|
||||
|
||||
89
libs/computer/computer/diorama_computer.py
Normal file
89
libs/computer/computer/diorama_computer.py
Normal file
@@ -0,0 +1,89 @@
|
||||
import asyncio
|
||||
|
||||
class DioramaComputer:
|
||||
"""
|
||||
A Computer-compatible proxy for Diorama that sends commands over the ComputerInterface.
|
||||
"""
|
||||
def __init__(self, computer, apps):
|
||||
self.computer = computer
|
||||
self.apps = apps
|
||||
self.interface = DioramaComputerInterface(computer, apps)
|
||||
self._initialized = False
|
||||
|
||||
async def __aenter__(self):
|
||||
self._initialized = True
|
||||
return self
|
||||
|
||||
async def run(self):
|
||||
if not self._initialized:
|
||||
await self.__aenter__()
|
||||
return self
|
||||
|
||||
class DioramaComputerInterface:
|
||||
"""
|
||||
Diorama Interface proxy that sends diorama_cmds via the Computer's interface.
|
||||
"""
|
||||
def __init__(self, computer, apps):
|
||||
self.computer = computer
|
||||
self.apps = apps
|
||||
self._scene_hitboxes = []
|
||||
self._scene_size = None
|
||||
|
||||
async def _send_cmd(self, action, arguments=None):
|
||||
arguments = arguments or {}
|
||||
arguments = {"app_list": self.apps, **arguments}
|
||||
# Use the computer's interface (must be initialized)
|
||||
iface = getattr(self.computer, "_interface", None)
|
||||
if iface is None:
|
||||
raise RuntimeError("Computer interface not initialized. Call run() first.")
|
||||
result = await iface.diorama_cmd(action, arguments)
|
||||
if not result.get("success"):
|
||||
raise RuntimeError(f"Diorama command failed: {result.get('error')}")
|
||||
return result.get("result")
|
||||
|
||||
async def screenshot(self, as_bytes=True):
|
||||
from PIL import Image
|
||||
result = await self._send_cmd("screenshot")
|
||||
img_bytes = result.get("image_bytes")
|
||||
hitboxes = result.get("hitboxes", [])
|
||||
self._scene_hitboxes = hitboxes
|
||||
# Assume server returns PNG bytes
|
||||
import io
|
||||
img = Image.open(io.BytesIO(img_bytes))
|
||||
self._scene_size = img.size
|
||||
return img_bytes if as_bytes else img
|
||||
|
||||
async def get_screen_size(self):
|
||||
if not self._scene_size:
|
||||
await self.screenshot(as_bytes=False)
|
||||
return {"width": self._scene_size[0], "height": self._scene_size[1]}
|
||||
|
||||
async def move_cursor(self, x, y):
|
||||
await self._send_cmd("move_cursor", {"x": x, "y": y})
|
||||
|
||||
async def left_click(self, x=None, y=None):
|
||||
await self._send_cmd("left_click", {"x": x, "y": y})
|
||||
|
||||
async def right_click(self, x=None, y=None):
|
||||
await self._send_cmd("right_click", {"x": x, "y": y})
|
||||
|
||||
async def double_click(self, x=None, y=None):
|
||||
await self._send_cmd("double_click", {"x": x, "y": y})
|
||||
|
||||
async def drag_to(self, x, y, duration=0.5):
|
||||
await self._send_cmd("drag_to", {"x": x, "y": y, "duration": duration})
|
||||
|
||||
async def get_cursor_position(self):
|
||||
return await self._send_cmd("get_cursor_position")
|
||||
|
||||
async def type_text(self, text):
|
||||
await self._send_cmd("type_text", {"text": text})
|
||||
|
||||
async def press_key(self, key):
|
||||
await self._send_cmd("press_key", {"key": key})
|
||||
|
||||
async def hotkey(self, *keys):
|
||||
await self._send_cmd("hotkey", {"keys": list(keys)})
|
||||
|
||||
async def to_screen_coordinates(self, x, y):
|
||||
return await self._send_cmd("to_screen_coordinates", {"x": x, "y": y})
|
||||
@@ -318,6 +318,10 @@ class MacOSComputerInterface(BaseComputerInterface):
|
||||
asyncio.create_task(self._ws.close())
|
||||
self._ws = None
|
||||
|
||||
async def diorama_cmd(self, action: str, arguments: dict = None) -> dict:
|
||||
"""Send a diorama command to the server (macOS only)."""
|
||||
return await self._send_command("diorama_cmd", {"action": action, "arguments": arguments or {}})
|
||||
|
||||
# Mouse Actions
|
||||
async def left_click(self, x: Optional[int] = None, y: Optional[int] = None) -> None:
|
||||
await self._send_command("left_click", {"x": x, "y": y})
|
||||
|
||||
Reference in New Issue
Block a user