diff --git a/libs/python/computer/computer/computer.py b/libs/python/computer/computer/computer.py index 1fbb38e5..7bf0717f 100644 --- a/libs/python/computer/computer/computer.py +++ b/libs/python/computer/computer/computer.py @@ -7,7 +7,15 @@ import platform import re import time import traceback -from typing import TYPE_CHECKING, Any, Dict, List, Literal, Optional, Union, cast +from typing import TYPE_CHECKING, Any, Awaitable, Callable, Dict, List, Literal, Optional, Union, cast, TypeVar +from functools import wraps +try: + from typing import ParamSpec +except Exception: # pragma: no cover + from typing_extensions import ParamSpec # type: ignore + +P = ParamSpec("P") +R = TypeVar("R") from core.telemetry import is_telemetry_enabled, record_event from PIL import Image @@ -980,7 +988,7 @@ class Computer: else: # POSIX (macOS/Linux) venv_path = f"$HOME/.venvs/{venv_name}" - create_cmd = f'mkdir -p "$HOME/.venvs" && python3 -m venv "{venv_path}"' + create_cmd = f'mkdir -p "$HOME/.venvs" && python -m venv "{venv_path}"' # Check if venv exists, if not create it check_cmd = f'test -d "{venv_path}" || ({create_cmd})' _ = await self.interface.run_command(check_cmd) @@ -991,8 +999,26 @@ class Computer: if requirements_str else "echo No requirements to install" ) - return await self.interface.run_command(install_cmd) + return await self.interface.run_command(install_cmd) + + async def pip_install(self, requirements: list[str]): + """Install packages using the system Python/pip (no venv). + Args: + requirements: List of package requirements to install globally/user site. + + Returns: + Tuple of (stdout, stderr) from the installation command + """ + requirements = requirements or [] + if not requirements: + return await self.interface.run_command("echo No requirements to install") + + # Use python -m pip for cross-platform consistency + reqs = " ".join(requirements) + install_cmd = f"python -m pip install {reqs}" + return await self.interface.run_command(install_cmd) + async def venv_cmd(self, venv_name: str, command: str): """Execute a shell command in a virtual environment. @@ -1068,19 +1094,23 @@ class Computer: raise Exception(f"Failed to reconstruct function source: {e}") # Create Python code that will define and execute the function + args_b64 = base64.b64encode(args_json.encode("utf-8")).decode("ascii") + kwargs_b64 = base64.b64encode(kwargs_json.encode("utf-8")).decode("ascii") + python_code = f''' import json import traceback +import base64 try: # Define the function from source {textwrap.indent(func_source, " ")} - # Deserialize args and kwargs from JSON - args_json = """{args_json}""" - kwargs_json = """{kwargs_json}""" - args = json.loads(args_json) - kwargs = json.loads(kwargs_json) + # Deserialize args and kwargs from base64 JSON + _args_b64 = """{args_b64}""" + _kwargs_b64 = """{kwargs_b64}""" + args = json.loads(base64.b64decode(_args_b64).decode('utf-8')) + kwargs = json.loads(base64.b64decode(_kwargs_b64).decode('utf-8')) # Execute the function result = {func_name}(*args, **kwargs) @@ -1144,10 +1174,21 @@ print(f"<<>>{{output_json}}<<>>") if output_payload["success"]: return output_payload["result"] else: + import builtins + # Recreate and raise the original exception - error_info = output_payload["error"] - error_class = eval(error_info["type"]) - raise error_class(error_info["message"]) + error_info = output_payload.get("error", {}) or {} + err_type = error_info.get("type") or "Exception" + err_msg = error_info.get("message") or "" + err_tb = error_info.get("traceback") or "" + + exc_cls = getattr(builtins, err_type, None) + if isinstance(exc_cls, type) and issubclass(exc_cls, BaseException): + # Built-in exception: rethrow with remote traceback appended + raise exc_cls(f"{err_msg}\n\nRemote traceback:\n{err_tb}") + else: + # Non built-in: raise a safe local error carrying full remote context + raise RuntimeError(f"{err_type}: {err_msg}\n\nRemote traceback:\n{err_tb}") else: raise Exception("Invalid output format: markers found but no content between them") else: @@ -1155,3 +1196,348 @@ print(f"<<>>{{output_json}}<<>>") raise Exception( f"No output payload found. stdout: {result.stdout}, stderr: {result.stderr}" ) + + async def venv_exec_background(self, venv_name: str, python_func, *args, requirements: Optional[List[str]] = None, **kwargs) -> int: + """Run the Python function in the venv in the background and return the PID. + + Uses a short launcher Python that spawns a detached child and exits immediately. + """ + import base64 + import inspect + import json + import textwrap + import time as _time + + try: + source = inspect.getsource(python_func) + func_source = textwrap.dedent(source).strip() + while func_source.lstrip().startswith("@"): + func_source = func_source.split("\n", 1)[1].strip() + func_name = python_func.__name__ + args_json = json.dumps(args, default=str) + kwargs_json = json.dumps(kwargs, default=str) + except OSError as e: + raise Exception(f"Cannot retrieve source code for function {python_func.__name__}: {e}") + except Exception as e: + raise Exception(f"Failed to reconstruct function source: {e}") + + reqs_list = requirements or [] + reqs_json = json.dumps(reqs_list) + + # Create Python code that will define and execute the function + args_b64 = base64.b64encode(args_json.encode("utf-8")).decode("ascii") + kwargs_b64 = base64.b64encode(kwargs_json.encode("utf-8")).decode("ascii") + + payload_code = f''' +import json +import traceback +import base64 + +try: + # Define the function from source +{textwrap.indent(func_source, " ")} + + # Deserialize args and kwargs from base64 JSON + _args_b64 = """{args_b64}""" + _kwargs_b64 = """{kwargs_b64}""" + args = json.loads(base64.b64decode(_args_b64).decode('utf-8')) + kwargs = json.loads(base64.b64decode(_kwargs_b64).decode('utf-8')) + + # Ensure requirements inside the active venv + for pkg in json.loads(''' + repr(reqs_json) + '''): + if pkg: + import subprocess, sys + subprocess.run([sys.executable, '-m', 'pip', 'install', pkg], check=False) + _ = {func_name}(*args, **kwargs) +except Exception: + import sys + sys.stderr.write(traceback.format_exc()) +''' + payload_b64 = base64.b64encode(payload_code.encode("utf-8")).decode("ascii") + + if self.os_type == "windows": + # Launcher spawns detached child and prints its PID + launcher_code = f''' +import base64, subprocess, os, sys +DETACHED_PROCESS = 0x00000008 +CREATE_NEW_PROCESS_GROUP = 0x00000200 +creationflags = DETACHED_PROCESS | CREATE_NEW_PROCESS_GROUP +code = base64.b64decode("{payload_b64}").decode("utf-8") +p = subprocess.Popen(["python", "-c", code], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, creationflags=creationflags) +print(p.pid) +''' + launcher_b64 = base64.b64encode(launcher_code.encode("utf-8")).decode("ascii") + venv_path = f"%USERPROFILE%\\.venvs\\{venv_name}" + cmd = ( + 'cmd /c "' + f'call "{venv_path}\\Scripts\\activate.bat" && ' + f'python -c "import base64; exec(base64.b64decode(\'{launcher_b64}\').decode(\'utf-8\'))"' + '"' + ) + result = await self.interface.run_command(cmd) + pid_str = (result.stdout or "").strip().splitlines()[-1].strip() + return int(pid_str) + else: + log = f"/tmp/cua_bg_{int(_time.time())}.log" + launcher_code = f''' +import base64, subprocess, os, sys +code = base64.b64decode("{payload_b64}").decode("utf-8") +with open("{log}", "ab", buffering=0) as f: + p = subprocess.Popen(["python", "-c", code], stdout=f, stderr=subprocess.STDOUT, preexec_fn=getattr(os, "setsid", None)) +print(p.pid) +''' + launcher_b64 = base64.b64encode(launcher_code.encode("utf-8")).decode("ascii") + venv_path = f"$HOME/.venvs/{venv_name}" + shell = ( + f'. "{venv_path}/bin/activate" && ' + f'python -c "import base64; exec(base64.b64decode(\'{launcher_b64}\').decode(\'utf-8\'))"' + ) + result = await self.interface.run_command(shell) + pid_str = (result.stdout or "").strip().splitlines()[-1].strip() + return int(pid_str) + + async def python_exec(self, python_func, *args, **kwargs): + """Execute a Python function using the system Python (no venv). + + Uses source extraction and base64 transport, mirroring venv_exec but + without virtual environment activation. + + Returns the function result or raises a reconstructed exception with + remote traceback context appended. + """ + import base64 + import inspect + import json + import textwrap + + try: + source = inspect.getsource(python_func) + func_source = textwrap.dedent(source).strip() + while func_source.lstrip().startswith("@"): + func_source = func_source.split("\n", 1)[1].strip() + func_name = python_func.__name__ + args_json = json.dumps(args, default=str) + kwargs_json = json.dumps(kwargs, default=str) + except OSError as e: + raise Exception(f"Cannot retrieve source code for function {python_func.__name__}: {e}") + except Exception as e: + raise Exception(f"Failed to reconstruct function source: {e}") + + # Create Python code that will define and execute the function + args_b64 = base64.b64encode(args_json.encode("utf-8")).decode("ascii") + kwargs_b64 = base64.b64encode(kwargs_json.encode("utf-8")).decode("ascii") + + python_code = f''' +import json +import traceback +import base64 + +try: + # Define the function from source +{textwrap.indent(func_source, " ")} + + # Deserialize args and kwargs from base64 JSON + _args_b64 = """{args_b64}""" + _kwargs_b64 = """{kwargs_b64}""" + args = json.loads(base64.b64decode(_args_b64).decode('utf-8')) + kwargs = json.loads(base64.b64decode(_kwargs_b64).decode('utf-8')) + + # Execute the function + result = {func_name}(*args, **kwargs) + + # Create success output payload + output_payload = {{ + "success": True, + "result": result, + "error": None + }} + +except Exception as e: + # Create error output payload + output_payload = {{ + "success": False, + "result": None, + "error": {{ + "type": type(e).__name__, + "message": str(e), + "traceback": traceback.format_exc() + }} + }} + +# Serialize the output payload as JSON +import json +output_json = json.dumps(output_payload, default=str) + +# Print the JSON output with markers +print(f"<<>>{{output_json}}<<>>") +''' + + encoded_code = base64.b64encode(python_code.encode("utf-8")).decode("ascii") + python_command = ( + f"python -c \"import base64; exec(base64.b64decode('{encoded_code}').decode('utf-8'))\"" + ) + result = await self.interface.run_command(python_command) + + start_marker = "<<>>" + end_marker = "<<>>" + + print(result.stdout[: result.stdout.find(start_marker)]) + + if start_marker in result.stdout and end_marker in result.stdout: + start_idx = result.stdout.find(start_marker) + len(start_marker) + end_idx = result.stdout.find(end_marker) + if start_idx < end_idx: + output_json = result.stdout[start_idx:end_idx] + try: + output_payload = json.loads(output_json) + except Exception as e: + raise Exception(f"Failed to decode output payload: {e}") + + if output_payload["success"]: + return output_payload["result"] + else: + import builtins + error_info = output_payload.get("error", {}) or {} + err_type = error_info.get("type") or "Exception" + err_msg = error_info.get("message") or "" + err_tb = error_info.get("traceback") or "" + exc_cls = getattr(builtins, err_type, None) + if isinstance(exc_cls, type) and issubclass(exc_cls, BaseException): + raise exc_cls(f"{err_msg}\n\nRemote traceback:\n{err_tb}") + else: + raise RuntimeError(f"{err_type}: {err_msg}\n\nRemote traceback:\n{err_tb}") + else: + raise Exception("Invalid output format: markers found but no content between them") + else: + raise Exception( + f"No output payload found. stdout: {result.stdout}, stderr: {result.stderr}" + ) + + async def python_exec_background(self, python_func, *args, requirements: Optional[List[str]] = None, **kwargs) -> int: + """Run a Python function with the system interpreter in the background and return PID. + + Uses a short launcher Python that spawns a detached child and exits immediately. + """ + import base64 + import inspect + import json + import textwrap + import time as _time + + try: + source = inspect.getsource(python_func) + func_source = textwrap.dedent(source).strip() + while func_source.lstrip().startswith("@"): + func_source = func_source.split("\n", 1)[1].strip() + func_name = python_func.__name__ + args_json = json.dumps(args, default=str) + kwargs_json = json.dumps(kwargs, default=str) + except OSError as e: + raise Exception(f"Cannot retrieve source code for function {python_func.__name__}: {e}") + except Exception as e: + raise Exception(f"Failed to reconstruct function source: {e}") + + # Create Python code that will define and execute the function + args_b64 = base64.b64encode(args_json.encode("utf-8")).decode("ascii") + kwargs_b64 = base64.b64encode(kwargs_json.encode("utf-8")).decode("ascii") + + payload_code = f''' +import json +import traceback +import base64 + +try: + # Define the function from source +{textwrap.indent(func_source, " ")} + + # Deserialize args and kwargs from base64 JSON + _args_b64 = """{args_b64}""" + _kwargs_b64 = """{kwargs_b64}""" + args = json.loads(base64.b64decode(_args_b64).decode('utf-8')) + kwargs = json.loads(base64.b64decode(_kwargs_b64).decode('utf-8')) + + _ = {func_name}(*args, **kwargs) +except Exception: + import sys + sys.stderr.write(traceback.format_exc()) +''' + payload_b64 = base64.b64encode(payload_code.encode("utf-8")).decode("ascii") + + if self.os_type == "windows": + launcher_code = f''' +import base64, subprocess, os, sys +DETACHED_PROCESS = 0x00000008 +CREATE_NEW_PROCESS_GROUP = 0x00000200 +creationflags = DETACHED_PROCESS | CREATE_NEW_PROCESS_GROUP +code = base64.b64decode("{payload_b64}").decode("utf-8") +p = subprocess.Popen(["python", "-c", code], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, creationflags=creationflags) +print(p.pid) +''' + launcher_b64 = base64.b64encode(launcher_code.encode("utf-8")).decode("ascii") + cmd = f"python -c \"import base64; exec(base64.b64decode('{launcher_b64}').decode('utf-8'))\"" + result = await self.interface.run_command(cmd) + pid_str = (result.stdout or "").strip().splitlines()[-1].strip() + return int(pid_str) + else: + log = f"/tmp/cua_bg_{int(_time.time())}.log" + launcher_code = f''' +import base64, subprocess, os, sys +code = base64.b64decode("{payload_b64}").decode("utf-8") +with open("{log}", "ab", buffering=0) as f: + p = subprocess.Popen(["python", "-c", code], stdout=f, stderr=subprocess.STDOUT, preexec_fn=getattr(os, "setsid", None)) +print(p.pid) +''' + launcher_b64 = base64.b64encode(launcher_code.encode("utf-8")).decode("ascii") + cmd = f"python -c \"import base64; exec(base64.b64decode('{launcher_b64}').decode('utf-8'))\"" + result = await self.interface.run_command(cmd) + pid_str = (result.stdout or "").strip().splitlines()[-1].strip() + return int(pid_str) + + def python_command( + self, + requirements: Optional[List[str]] = None, + *, + venv_name: str = "default", + use_system_python: bool = False, + background: bool = False, + ) -> Callable[[Callable[P, R]], Callable[P, Awaitable[R]]]: + """Decorator to execute a Python function remotely in this Computer's venv. + + This mirrors `computer.helpers.sandboxed()` but binds to this instance and + optionally ensures required packages are installed before execution. + + Args: + requirements: Packages to install in the virtual environment. + venv_name: Name of the virtual environment to use. + use_system_python: If True, use the system Python/pip instead of a venv. + background: If True, run the function detached and return the child PID immediately. + + Returns: + A decorator that turns a local function into an async callable which + runs remotely and returns the function's result. + """ + + reqs = list(requirements or []) + + def decorator(func: Callable[P, R]) -> Callable[P, Awaitable[R]]: + @wraps(func) + async def wrapper(*args: P.args, **kwargs: P.kwargs) -> R: + if use_system_python: + # For background, avoid blocking installs; install inside child process + if background: + return await self.python_exec_background(func, *args, requirements=reqs, **kwargs) # type: ignore[return-value] + # Foreground: install first, then execute + if reqs: + await self.pip_install(reqs) + return await self.python_exec(func, *args, **kwargs) + else: + # For background, avoid blocking installs; install inside child process under venv + if background: + return await self.venv_exec_background(venv_name, func, *args, requirements=reqs, **kwargs) # type: ignore[return-value] + # Foreground: ensure venv and install, then execute + await self.venv_install(venv_name, reqs) + return await self.venv_exec(venv_name, func, *args, **kwargs) + + return wrapper + + return decorator diff --git a/libs/python/computer/computer/helpers.py b/libs/python/computer/computer/helpers.py index 608dcbb9..29231b56 100644 --- a/libs/python/computer/computer/helpers.py +++ b/libs/python/computer/computer/helpers.py @@ -5,7 +5,17 @@ Helper functions and decorators for the Computer module. import asyncio import logging from functools import wraps -from typing import Any, Callable, Optional, TypeVar, cast +from typing import Any, Awaitable, Callable, Optional, TypeVar + +try: + # Python 3.12+ has ParamSpec in typing + from typing import ParamSpec +except ImportError: # pragma: no cover + # Fallback for environments without ParamSpec in typing + from typing_extensions import ParamSpec # type: ignore + +P = ParamSpec("P") +R = TypeVar("R") # Global reference to the default computer instance _default_computer = None @@ -13,7 +23,7 @@ _default_computer = None logger = logging.getLogger(__name__) -def set_default_computer(computer): +def set_default_computer(computer: Any) -> None: """ Set the default computer instance to be used by the remote decorator. @@ -24,7 +34,11 @@ def set_default_computer(computer): _default_computer = computer -def sandboxed(venv_name: str = "default", computer: str = "default", max_retries: int = 3): +def sandboxed( + venv_name: str = "default", + computer: str = "default", + max_retries: int = 3, +) -> Callable[[Callable[P, R]], Callable[P, Awaitable[R]]]: """ Decorator that wraps a function to be executed remotely via computer.venv_exec @@ -34,9 +48,9 @@ def sandboxed(venv_name: str = "default", computer: str = "default", max_retries max_retries: Maximum number of retries for the remote execution """ - def decorator(func): + def decorator(func: Callable[P, R]) -> Callable[P, Awaitable[R]]: @wraps(func) - async def wrapper(*args, **kwargs): + async def wrapper(*args: P.args, **kwargs: P.kwargs) -> R: # Determine which computer instance to use comp = computer if computer != "default" else _default_computer @@ -54,6 +68,9 @@ def sandboxed(venv_name: str = "default", computer: str = "default", max_retries if i == max_retries - 1: raise e + # Should be unreachable because we either returned or raised + raise RuntimeError("sandboxed wrapper reached unreachable code path") + return wrapper return decorator