Files
computer/libs/python/computer-server/computer_server/handlers/generic.py
2025-10-24 16:56:39 -07:00

470 lines
16 KiB
Python

"""
Generic handlers for all OSes.
Includes:
- DesktopHandler
- FileHandler
"""
import base64
import os
import platform
import subprocess
import webbrowser
from pathlib import Path
from typing import Any, Dict, Optional
from ..utils import wallpaper
from .base import BaseDesktopHandler, BaseFileHandler, BaseWindowHandler
try:
import pywinctl as pwc
except Exception: # pragma: no cover
pwc = None # type: ignore
def resolve_path(path: str) -> Path:
"""Resolve a path to its absolute path. Expand ~ to the user's home directory.
Args:
path: The file or directory path to resolve
Returns:
Path: The resolved absolute path
"""
return Path(path).expanduser().resolve()
# ===== Cross-platform Desktop command handlers =====
class GenericDesktopHandler(BaseDesktopHandler):
"""
Generic desktop handler providing desktop-related operations.
Implements:
- get_desktop_environment: detect current desktop environment
- set_wallpaper: set desktop wallpaper path
"""
async def get_desktop_environment(self) -> Dict[str, Any]:
"""
Get the current desktop environment.
Returns:
Dict containing 'success' boolean and either 'environment' string or 'error' string
"""
try:
env = wallpaper.get_desktop_environment()
return {"success": True, "environment": env}
except Exception as e:
return {"success": False, "error": str(e)}
async def set_wallpaper(self, path: str) -> Dict[str, Any]:
"""
Set the desktop wallpaper to the specified path.
Args:
path: The file path to set as wallpaper
Returns:
Dict containing 'success' boolean and optionally 'error' string
"""
try:
file_path = resolve_path(path)
ok = wallpaper.set_wallpaper(str(file_path))
return {"success": bool(ok)}
except Exception as e:
return {"success": False, "error": str(e)}
# ===== Cross-platform window control command handlers =====
class GenericWindowHandler(BaseWindowHandler):
"""
Cross-platform window management using pywinctl where possible.
"""
async def open(self, target: str) -> Dict[str, Any]:
try:
if target.startswith("http://") or target.startswith("https://"):
ok = webbrowser.open(target)
return {"success": bool(ok)}
path = str(resolve_path(target))
sys = platform.system().lower()
if sys == "darwin":
subprocess.Popen(["open", path])
elif sys == "linux":
subprocess.Popen(["xdg-open", path])
elif sys == "windows":
os.startfile(path) # type: ignore[attr-defined]
else:
return {"success": False, "error": f"Unsupported OS: {sys}"}
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def launch(self, app: str, args: Optional[list[str]] = None) -> Dict[str, Any]:
try:
if args:
proc = subprocess.Popen([app, *args])
else:
# allow shell command like "libreoffice --writer"
proc = subprocess.Popen(app, shell=True)
return {"success": True, "pid": proc.pid}
except Exception as e:
return {"success": False, "error": str(e)}
def _get_window_by_id(self, window_id: int | str):
if pwc is None:
raise RuntimeError("pywinctl not available")
try:
windows = pwc.getAllWindowsDict()
return windows.get(window_id) or windows.get(int(window_id))
except Exception:
return None
async def get_current_window_id(self) -> Dict[str, Any]:
try:
if pwc is None:
return {"success": False, "error": "pywinctl not available"}
win = pwc.getActiveWindow()
if not win:
return {"success": False, "error": "No active window"}
return {"success": True, "window_id": win.getHandle()}
except Exception as e:
return {"success": False, "error": str(e)}
async def get_application_windows(self, app: str) -> Dict[str, Any]:
try:
if pwc is None:
return {"success": False, "error": "pywinctl not available"}
wins = pwc.getWindowsWithTitle(app, condition=pwc.Re.CONTAINS, flags=pwc.Re.IGNORECASE)
ids = [w.getHandle() for w in wins]
return {"success": True, "windows": ids}
except Exception as e:
return {"success": False, "error": str(e)}
async def get_window_name(self, window_id: int | str) -> Dict[str, Any]:
try:
if pwc is None:
return {"success": False, "error": "pywinctl not available"}
w = self._get_window_by_id(window_id)
if not w:
return {"success": False, "error": "Window not found"}
return {"success": True, "name": w.title}
except Exception as e:
return {"success": False, "error": str(e)}
async def get_window_size(self, window_id: int | str) -> Dict[str, Any]:
try:
if pwc is None:
return {"success": False, "error": "pywinctl not available"}
w = self._get_window_by_id(window_id)
if not w:
return {"success": False, "error": "Window not found"}
width, height = w.size
return {"success": True, "width": int(width), "height": int(height)}
except Exception as e:
return {"success": False, "error": str(e)}
async def get_window_position(self, window_id: int | str) -> Dict[str, Any]:
try:
if pwc is None:
return {"success": False, "error": "pywinctl not available"}
w = self._get_window_by_id(window_id)
if not w:
return {"success": False, "error": "Window not found"}
x, y = w.position
return {"success": True, "x": int(x), "y": int(y)}
except Exception as e:
return {"success": False, "error": str(e)}
async def set_window_size(self, window_id: int | str, width: int, height: int) -> Dict[str, Any]:
try:
if pwc is None:
return {"success": False, "error": "pywinctl not available"}
w = self._get_window_by_id(window_id)
if not w:
return {"success": False, "error": "Window not found"}
ok = w.resizeTo(int(width), int(height))
return {"success": bool(ok)}
except Exception as e:
return {"success": False, "error": str(e)}
async def set_window_position(self, window_id: int | str, x: int, y: int) -> Dict[str, Any]:
try:
if pwc is None:
return {"success": False, "error": "pywinctl not available"}
w = self._get_window_by_id(window_id)
if not w:
return {"success": False, "error": "Window not found"}
ok = w.moveTo(int(x), int(y))
return {"success": bool(ok)}
except Exception as e:
return {"success": False, "error": str(e)}
async def maximize_window(self, window_id: int | str) -> Dict[str, Any]:
try:
if pwc is None:
return {"success": False, "error": "pywinctl not available"}
w = self._get_window_by_id(window_id)
if not w:
return {"success": False, "error": "Window not found"}
ok = w.maximize()
return {"success": bool(ok)}
except Exception as e:
return {"success": False, "error": str(e)}
async def minimize_window(self, window_id: int | str) -> Dict[str, Any]:
try:
if pwc is None:
return {"success": False, "error": "pywinctl not available"}
w = self._get_window_by_id(window_id)
if not w:
return {"success": False, "error": "Window not found"}
ok = w.minimize()
return {"success": bool(ok)}
except Exception as e:
return {"success": False, "error": str(e)}
async def activate_window(self, window_id: int | str) -> Dict[str, Any]:
try:
if pwc is None:
return {"success": False, "error": "pywinctl not available"}
w = self._get_window_by_id(window_id)
if not w:
return {"success": False, "error": "Window not found"}
ok = w.activate()
return {"success": bool(ok)}
except Exception as e:
return {"success": False, "error": str(e)}
async def close_window(self, window_id: int | str) -> Dict[str, Any]:
try:
if pwc is None:
return {"success": False, "error": "pywinctl not available"}
w = self._get_window_by_id(window_id)
if not w:
return {"success": False, "error": "Window not found"}
ok = w.close()
return {"success": bool(ok)}
except Exception as e:
return {"success": False, "error": str(e)}
# ===== Cross-platform file system command handlers =====
class GenericFileHandler(BaseFileHandler):
"""
Generic file handler that provides file system operations for all operating systems.
This class implements the BaseFileHandler interface and provides methods for
file and directory operations including reading, writing, creating, and deleting
files and directories.
"""
async def file_exists(self, path: str) -> Dict[str, Any]:
"""
Check if a file exists at the specified path.
Args:
path: The file path to check
Returns:
Dict containing 'success' boolean and either 'exists' boolean or 'error' string
"""
try:
return {"success": True, "exists": resolve_path(path).is_file()}
except Exception as e:
return {"success": False, "error": str(e)}
async def directory_exists(self, path: str) -> Dict[str, Any]:
"""
Check if a directory exists at the specified path.
Args:
path: The directory path to check
Returns:
Dict containing 'success' boolean and either 'exists' boolean or 'error' string
"""
try:
return {"success": True, "exists": resolve_path(path).is_dir()}
except Exception as e:
return {"success": False, "error": str(e)}
async def list_dir(self, path: str) -> Dict[str, Any]:
"""
List all files and directories in the specified directory.
Args:
path: The directory path to list
Returns:
Dict containing 'success' boolean and either 'files' list of names or 'error' string
"""
try:
return {
"success": True,
"files": [
p.name for p in resolve_path(path).iterdir() if p.is_file() or p.is_dir()
],
}
except Exception as e:
return {"success": False, "error": str(e)}
async def read_text(self, path: str) -> Dict[str, Any]:
"""
Read the contents of a text file.
Args:
path: The file path to read from
Returns:
Dict containing 'success' boolean and either 'content' string or 'error' string
"""
try:
return {"success": True, "content": resolve_path(path).read_text()}
except Exception as e:
return {"success": False, "error": str(e)}
async def write_text(self, path: str, content: str) -> Dict[str, Any]:
"""
Write text content to a file.
Args:
path: The file path to write to
content: The text content to write
Returns:
Dict containing 'success' boolean and optionally 'error' string
"""
try:
resolve_path(path).write_text(content)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def write_bytes(
self, path: str, content_b64: str, append: bool = False
) -> Dict[str, Any]:
"""
Write binary content to a file from base64 encoded string.
Args:
path: The file path to write to
content_b64: Base64 encoded binary content
append: If True, append to existing file; if False, overwrite
Returns:
Dict containing 'success' boolean and optionally 'error' string
"""
try:
mode = "ab" if append else "wb"
with open(resolve_path(path), mode) as f:
f.write(base64.b64decode(content_b64))
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def read_bytes(
self, path: str, offset: int = 0, length: Optional[int] = None
) -> Dict[str, Any]:
"""
Read binary content from a file and return as base64 encoded string.
Args:
path: The file path to read from
offset: Byte offset to start reading from
length: Number of bytes to read; if None, read entire file from offset
Returns:
Dict containing 'success' boolean and either 'content_b64' string or 'error' string
"""
try:
file_path = resolve_path(path)
with open(file_path, "rb") as f:
if offset > 0:
f.seek(offset)
if length is not None:
content = f.read(length)
else:
content = f.read()
return {"success": True, "content_b64": base64.b64encode(content).decode("utf-8")}
except Exception as e:
return {"success": False, "error": str(e)}
async def get_file_size(self, path: str) -> Dict[str, Any]:
"""
Get the size of a file in bytes.
Args:
path: The file path to get size for
Returns:
Dict containing 'success' boolean and either 'size' integer or 'error' string
"""
try:
file_path = resolve_path(path)
size = file_path.stat().st_size
return {"success": True, "size": size}
except Exception as e:
return {"success": False, "error": str(e)}
async def delete_file(self, path: str) -> Dict[str, Any]:
"""
Delete a file at the specified path.
Args:
path: The file path to delete
Returns:
Dict containing 'success' boolean and optionally 'error' string
"""
try:
resolve_path(path).unlink()
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def create_dir(self, path: str) -> Dict[str, Any]:
"""
Create a directory at the specified path.
Creates parent directories if they don't exist and doesn't raise an error
if the directory already exists.
Args:
path: The directory path to create
Returns:
Dict containing 'success' boolean and optionally 'error' string
"""
try:
resolve_path(path).mkdir(parents=True, exist_ok=True)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def delete_dir(self, path: str) -> Dict[str, Any]:
"""
Delete an empty directory at the specified path.
Args:
path: The directory path to delete
Returns:
Dict containing 'success' boolean and optionally 'error' string
"""
try:
resolve_path(path).rmdir()
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}