fix python rpc with multiline input args

This commit is contained in:
Dillon DuPont
2025-11-01 20:50:58 -04:00
parent 07ad8dc351
commit 5f58ede106
2 changed files with 419 additions and 16 deletions

View File

@@ -7,7 +7,15 @@ import platform
import re
import time
import traceback
from typing import TYPE_CHECKING, Any, Dict, List, Literal, Optional, Union, cast
from typing import TYPE_CHECKING, Any, Awaitable, Callable, Dict, List, Literal, Optional, Union, cast, TypeVar
from functools import wraps
try:
from typing import ParamSpec
except Exception: # pragma: no cover
from typing_extensions import ParamSpec # type: ignore
P = ParamSpec("P")
R = TypeVar("R")
from core.telemetry import is_telemetry_enabled, record_event
from PIL import Image
@@ -980,7 +988,7 @@ class Computer:
else:
# POSIX (macOS/Linux)
venv_path = f"$HOME/.venvs/{venv_name}"
create_cmd = f'mkdir -p "$HOME/.venvs" && python3 -m venv "{venv_path}"'
create_cmd = f'mkdir -p "$HOME/.venvs" && python -m venv "{venv_path}"'
# Check if venv exists, if not create it
check_cmd = f'test -d "{venv_path}" || ({create_cmd})'
_ = await self.interface.run_command(check_cmd)
@@ -991,8 +999,26 @@ class Computer:
if requirements_str
else "echo No requirements to install"
)
return await self.interface.run_command(install_cmd)
return await self.interface.run_command(install_cmd)
async def pip_install(self, requirements: list[str]):
"""Install packages using the system Python/pip (no venv).
Args:
requirements: List of package requirements to install globally/user site.
Returns:
Tuple of (stdout, stderr) from the installation command
"""
requirements = requirements or []
if not requirements:
return await self.interface.run_command("echo No requirements to install")
# Use python -m pip for cross-platform consistency
reqs = " ".join(requirements)
install_cmd = f"python -m pip install {reqs}"
return await self.interface.run_command(install_cmd)
async def venv_cmd(self, venv_name: str, command: str):
"""Execute a shell command in a virtual environment.
@@ -1068,19 +1094,23 @@ class Computer:
raise Exception(f"Failed to reconstruct function source: {e}")
# Create Python code that will define and execute the function
args_b64 = base64.b64encode(args_json.encode("utf-8")).decode("ascii")
kwargs_b64 = base64.b64encode(kwargs_json.encode("utf-8")).decode("ascii")
python_code = f'''
import json
import traceback
import base64
try:
# Define the function from source
{textwrap.indent(func_source, " ")}
# Deserialize args and kwargs from JSON
args_json = """{args_json}"""
kwargs_json = """{kwargs_json}"""
args = json.loads(args_json)
kwargs = json.loads(kwargs_json)
# Deserialize args and kwargs from base64 JSON
_args_b64 = """{args_b64}"""
_kwargs_b64 = """{kwargs_b64}"""
args = json.loads(base64.b64decode(_args_b64).decode('utf-8'))
kwargs = json.loads(base64.b64decode(_kwargs_b64).decode('utf-8'))
# Execute the function
result = {func_name}(*args, **kwargs)
@@ -1144,10 +1174,21 @@ print(f"<<<VENV_EXEC_START>>>{{output_json}}<<<VENV_EXEC_END>>>")
if output_payload["success"]:
return output_payload["result"]
else:
import builtins
# Recreate and raise the original exception
error_info = output_payload["error"]
error_class = eval(error_info["type"])
raise error_class(error_info["message"])
error_info = output_payload.get("error", {}) or {}
err_type = error_info.get("type") or "Exception"
err_msg = error_info.get("message") or ""
err_tb = error_info.get("traceback") or ""
exc_cls = getattr(builtins, err_type, None)
if isinstance(exc_cls, type) and issubclass(exc_cls, BaseException):
# Built-in exception: rethrow with remote traceback appended
raise exc_cls(f"{err_msg}\n\nRemote traceback:\n{err_tb}")
else:
# Non built-in: raise a safe local error carrying full remote context
raise RuntimeError(f"{err_type}: {err_msg}\n\nRemote traceback:\n{err_tb}")
else:
raise Exception("Invalid output format: markers found but no content between them")
else:
@@ -1155,3 +1196,348 @@ print(f"<<<VENV_EXEC_START>>>{{output_json}}<<<VENV_EXEC_END>>>")
raise Exception(
f"No output payload found. stdout: {result.stdout}, stderr: {result.stderr}"
)
async def venv_exec_background(self, venv_name: str, python_func, *args, requirements: Optional[List[str]] = None, **kwargs) -> int:
"""Run the Python function in the venv in the background and return the PID.
Uses a short launcher Python that spawns a detached child and exits immediately.
"""
import base64
import inspect
import json
import textwrap
import time as _time
try:
source = inspect.getsource(python_func)
func_source = textwrap.dedent(source).strip()
while func_source.lstrip().startswith("@"):
func_source = func_source.split("\n", 1)[1].strip()
func_name = python_func.__name__
args_json = json.dumps(args, default=str)
kwargs_json = json.dumps(kwargs, default=str)
except OSError as e:
raise Exception(f"Cannot retrieve source code for function {python_func.__name__}: {e}")
except Exception as e:
raise Exception(f"Failed to reconstruct function source: {e}")
reqs_list = requirements or []
reqs_json = json.dumps(reqs_list)
# Create Python code that will define and execute the function
args_b64 = base64.b64encode(args_json.encode("utf-8")).decode("ascii")
kwargs_b64 = base64.b64encode(kwargs_json.encode("utf-8")).decode("ascii")
payload_code = f'''
import json
import traceback
import base64
try:
# Define the function from source
{textwrap.indent(func_source, " ")}
# Deserialize args and kwargs from base64 JSON
_args_b64 = """{args_b64}"""
_kwargs_b64 = """{kwargs_b64}"""
args = json.loads(base64.b64decode(_args_b64).decode('utf-8'))
kwargs = json.loads(base64.b64decode(_kwargs_b64).decode('utf-8'))
# Ensure requirements inside the active venv
for pkg in json.loads(''' + repr(reqs_json) + '''):
if pkg:
import subprocess, sys
subprocess.run([sys.executable, '-m', 'pip', 'install', pkg], check=False)
_ = {func_name}(*args, **kwargs)
except Exception:
import sys
sys.stderr.write(traceback.format_exc())
'''
payload_b64 = base64.b64encode(payload_code.encode("utf-8")).decode("ascii")
if self.os_type == "windows":
# Launcher spawns detached child and prints its PID
launcher_code = f'''
import base64, subprocess, os, sys
DETACHED_PROCESS = 0x00000008
CREATE_NEW_PROCESS_GROUP = 0x00000200
creationflags = DETACHED_PROCESS | CREATE_NEW_PROCESS_GROUP
code = base64.b64decode("{payload_b64}").decode("utf-8")
p = subprocess.Popen(["python", "-c", code], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, creationflags=creationflags)
print(p.pid)
'''
launcher_b64 = base64.b64encode(launcher_code.encode("utf-8")).decode("ascii")
venv_path = f"%USERPROFILE%\\.venvs\\{venv_name}"
cmd = (
'cmd /c "'
f'call "{venv_path}\\Scripts\\activate.bat" && '
f'python -c "import base64; exec(base64.b64decode(\'{launcher_b64}\').decode(\'utf-8\'))"'
'"'
)
result = await self.interface.run_command(cmd)
pid_str = (result.stdout or "").strip().splitlines()[-1].strip()
return int(pid_str)
else:
log = f"/tmp/cua_bg_{int(_time.time())}.log"
launcher_code = f'''
import base64, subprocess, os, sys
code = base64.b64decode("{payload_b64}").decode("utf-8")
with open("{log}", "ab", buffering=0) as f:
p = subprocess.Popen(["python", "-c", code], stdout=f, stderr=subprocess.STDOUT, preexec_fn=getattr(os, "setsid", None))
print(p.pid)
'''
launcher_b64 = base64.b64encode(launcher_code.encode("utf-8")).decode("ascii")
venv_path = f"$HOME/.venvs/{venv_name}"
shell = (
f'. "{venv_path}/bin/activate" && '
f'python -c "import base64; exec(base64.b64decode(\'{launcher_b64}\').decode(\'utf-8\'))"'
)
result = await self.interface.run_command(shell)
pid_str = (result.stdout or "").strip().splitlines()[-1].strip()
return int(pid_str)
async def python_exec(self, python_func, *args, **kwargs):
"""Execute a Python function using the system Python (no venv).
Uses source extraction and base64 transport, mirroring venv_exec but
without virtual environment activation.
Returns the function result or raises a reconstructed exception with
remote traceback context appended.
"""
import base64
import inspect
import json
import textwrap
try:
source = inspect.getsource(python_func)
func_source = textwrap.dedent(source).strip()
while func_source.lstrip().startswith("@"):
func_source = func_source.split("\n", 1)[1].strip()
func_name = python_func.__name__
args_json = json.dumps(args, default=str)
kwargs_json = json.dumps(kwargs, default=str)
except OSError as e:
raise Exception(f"Cannot retrieve source code for function {python_func.__name__}: {e}")
except Exception as e:
raise Exception(f"Failed to reconstruct function source: {e}")
# Create Python code that will define and execute the function
args_b64 = base64.b64encode(args_json.encode("utf-8")).decode("ascii")
kwargs_b64 = base64.b64encode(kwargs_json.encode("utf-8")).decode("ascii")
python_code = f'''
import json
import traceback
import base64
try:
# Define the function from source
{textwrap.indent(func_source, " ")}
# Deserialize args and kwargs from base64 JSON
_args_b64 = """{args_b64}"""
_kwargs_b64 = """{kwargs_b64}"""
args = json.loads(base64.b64decode(_args_b64).decode('utf-8'))
kwargs = json.loads(base64.b64decode(_kwargs_b64).decode('utf-8'))
# Execute the function
result = {func_name}(*args, **kwargs)
# Create success output payload
output_payload = {{
"success": True,
"result": result,
"error": None
}}
except Exception as e:
# Create error output payload
output_payload = {{
"success": False,
"result": None,
"error": {{
"type": type(e).__name__,
"message": str(e),
"traceback": traceback.format_exc()
}}
}}
# Serialize the output payload as JSON
import json
output_json = json.dumps(output_payload, default=str)
# Print the JSON output with markers
print(f"<<<VENV_EXEC_START>>>{{output_json}}<<<VENV_EXEC_END>>>")
'''
encoded_code = base64.b64encode(python_code.encode("utf-8")).decode("ascii")
python_command = (
f"python -c \"import base64; exec(base64.b64decode('{encoded_code}').decode('utf-8'))\""
)
result = await self.interface.run_command(python_command)
start_marker = "<<<VENV_EXEC_START>>>"
end_marker = "<<<VENV_EXEC_END>>>"
print(result.stdout[: result.stdout.find(start_marker)])
if start_marker in result.stdout and end_marker in result.stdout:
start_idx = result.stdout.find(start_marker) + len(start_marker)
end_idx = result.stdout.find(end_marker)
if start_idx < end_idx:
output_json = result.stdout[start_idx:end_idx]
try:
output_payload = json.loads(output_json)
except Exception as e:
raise Exception(f"Failed to decode output payload: {e}")
if output_payload["success"]:
return output_payload["result"]
else:
import builtins
error_info = output_payload.get("error", {}) or {}
err_type = error_info.get("type") or "Exception"
err_msg = error_info.get("message") or ""
err_tb = error_info.get("traceback") or ""
exc_cls = getattr(builtins, err_type, None)
if isinstance(exc_cls, type) and issubclass(exc_cls, BaseException):
raise exc_cls(f"{err_msg}\n\nRemote traceback:\n{err_tb}")
else:
raise RuntimeError(f"{err_type}: {err_msg}\n\nRemote traceback:\n{err_tb}")
else:
raise Exception("Invalid output format: markers found but no content between them")
else:
raise Exception(
f"No output payload found. stdout: {result.stdout}, stderr: {result.stderr}"
)
async def python_exec_background(self, python_func, *args, requirements: Optional[List[str]] = None, **kwargs) -> int:
"""Run a Python function with the system interpreter in the background and return PID.
Uses a short launcher Python that spawns a detached child and exits immediately.
"""
import base64
import inspect
import json
import textwrap
import time as _time
try:
source = inspect.getsource(python_func)
func_source = textwrap.dedent(source).strip()
while func_source.lstrip().startswith("@"):
func_source = func_source.split("\n", 1)[1].strip()
func_name = python_func.__name__
args_json = json.dumps(args, default=str)
kwargs_json = json.dumps(kwargs, default=str)
except OSError as e:
raise Exception(f"Cannot retrieve source code for function {python_func.__name__}: {e}")
except Exception as e:
raise Exception(f"Failed to reconstruct function source: {e}")
# Create Python code that will define and execute the function
args_b64 = base64.b64encode(args_json.encode("utf-8")).decode("ascii")
kwargs_b64 = base64.b64encode(kwargs_json.encode("utf-8")).decode("ascii")
payload_code = f'''
import json
import traceback
import base64
try:
# Define the function from source
{textwrap.indent(func_source, " ")}
# Deserialize args and kwargs from base64 JSON
_args_b64 = """{args_b64}"""
_kwargs_b64 = """{kwargs_b64}"""
args = json.loads(base64.b64decode(_args_b64).decode('utf-8'))
kwargs = json.loads(base64.b64decode(_kwargs_b64).decode('utf-8'))
_ = {func_name}(*args, **kwargs)
except Exception:
import sys
sys.stderr.write(traceback.format_exc())
'''
payload_b64 = base64.b64encode(payload_code.encode("utf-8")).decode("ascii")
if self.os_type == "windows":
launcher_code = f'''
import base64, subprocess, os, sys
DETACHED_PROCESS = 0x00000008
CREATE_NEW_PROCESS_GROUP = 0x00000200
creationflags = DETACHED_PROCESS | CREATE_NEW_PROCESS_GROUP
code = base64.b64decode("{payload_b64}").decode("utf-8")
p = subprocess.Popen(["python", "-c", code], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, creationflags=creationflags)
print(p.pid)
'''
launcher_b64 = base64.b64encode(launcher_code.encode("utf-8")).decode("ascii")
cmd = f"python -c \"import base64; exec(base64.b64decode('{launcher_b64}').decode('utf-8'))\""
result = await self.interface.run_command(cmd)
pid_str = (result.stdout or "").strip().splitlines()[-1].strip()
return int(pid_str)
else:
log = f"/tmp/cua_bg_{int(_time.time())}.log"
launcher_code = f'''
import base64, subprocess, os, sys
code = base64.b64decode("{payload_b64}").decode("utf-8")
with open("{log}", "ab", buffering=0) as f:
p = subprocess.Popen(["python", "-c", code], stdout=f, stderr=subprocess.STDOUT, preexec_fn=getattr(os, "setsid", None))
print(p.pid)
'''
launcher_b64 = base64.b64encode(launcher_code.encode("utf-8")).decode("ascii")
cmd = f"python -c \"import base64; exec(base64.b64decode('{launcher_b64}').decode('utf-8'))\""
result = await self.interface.run_command(cmd)
pid_str = (result.stdout or "").strip().splitlines()[-1].strip()
return int(pid_str)
def python_command(
self,
requirements: Optional[List[str]] = None,
*,
venv_name: str = "default",
use_system_python: bool = False,
background: bool = False,
) -> Callable[[Callable[P, R]], Callable[P, Awaitable[R]]]:
"""Decorator to execute a Python function remotely in this Computer's venv.
This mirrors `computer.helpers.sandboxed()` but binds to this instance and
optionally ensures required packages are installed before execution.
Args:
requirements: Packages to install in the virtual environment.
venv_name: Name of the virtual environment to use.
use_system_python: If True, use the system Python/pip instead of a venv.
background: If True, run the function detached and return the child PID immediately.
Returns:
A decorator that turns a local function into an async callable which
runs remotely and returns the function's result.
"""
reqs = list(requirements or [])
def decorator(func: Callable[P, R]) -> Callable[P, Awaitable[R]]:
@wraps(func)
async def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
if use_system_python:
# For background, avoid blocking installs; install inside child process
if background:
return await self.python_exec_background(func, *args, requirements=reqs, **kwargs) # type: ignore[return-value]
# Foreground: install first, then execute
if reqs:
await self.pip_install(reqs)
return await self.python_exec(func, *args, **kwargs)
else:
# For background, avoid blocking installs; install inside child process under venv
if background:
return await self.venv_exec_background(venv_name, func, *args, requirements=reqs, **kwargs) # type: ignore[return-value]
# Foreground: ensure venv and install, then execute
await self.venv_install(venv_name, reqs)
return await self.venv_exec(venv_name, func, *args, **kwargs)
return wrapper
return decorator

View File

@@ -5,7 +5,17 @@ Helper functions and decorators for the Computer module.
import asyncio
import logging
from functools import wraps
from typing import Any, Callable, Optional, TypeVar, cast
from typing import Any, Awaitable, Callable, Optional, TypeVar
try:
# Python 3.12+ has ParamSpec in typing
from typing import ParamSpec
except ImportError: # pragma: no cover
# Fallback for environments without ParamSpec in typing
from typing_extensions import ParamSpec # type: ignore
P = ParamSpec("P")
R = TypeVar("R")
# Global reference to the default computer instance
_default_computer = None
@@ -13,7 +23,7 @@ _default_computer = None
logger = logging.getLogger(__name__)
def set_default_computer(computer):
def set_default_computer(computer: Any) -> None:
"""
Set the default computer instance to be used by the remote decorator.
@@ -24,7 +34,11 @@ def set_default_computer(computer):
_default_computer = computer
def sandboxed(venv_name: str = "default", computer: str = "default", max_retries: int = 3):
def sandboxed(
venv_name: str = "default",
computer: str = "default",
max_retries: int = 3,
) -> Callable[[Callable[P, R]], Callable[P, Awaitable[R]]]:
"""
Decorator that wraps a function to be executed remotely via computer.venv_exec
@@ -34,9 +48,9 @@ def sandboxed(venv_name: str = "default", computer: str = "default", max_retries
max_retries: Maximum number of retries for the remote execution
"""
def decorator(func):
def decorator(func: Callable[P, R]) -> Callable[P, Awaitable[R]]:
@wraps(func)
async def wrapper(*args, **kwargs):
async def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
# Determine which computer instance to use
comp = computer if computer != "default" else _default_computer
@@ -54,6 +68,9 @@ def sandboxed(venv_name: str = "default", computer: str = "default", max_retries
if i == max_retries - 1:
raise e
# Should be unreachable because we either returned or raised
raise RuntimeError("sandboxed wrapper reached unreachable code path")
return wrapper
return decorator