Initial cloud provider impl

This commit is contained in:
Dillon DuPont
2025-05-24 17:59:54 -04:00
parent eb0e429798
commit 10c579af1b
4 changed files with 108 additions and 73 deletions

View File

@@ -76,6 +76,41 @@ Alternatively, see the [Developer Guide](./docs/Developer-Guide.md) for building
### Step 4: Use in Your Code
#### 🚀 Recommended: CloudProvider (TryCua Cloud)
```python
from computer import Computer
from agent import ComputerAgent, LLM
async def main():
# Connect to a cloud-hosted macOS VM (recommended for scale, speed, and reliability)
async with Computer(
os_type="macos",
display="1024x768",
provider_type="cloud",
api_key="YOUR_CUA_API_KEY",
name="my-macos-vm",
) as computer:
# Example: Direct control of a cloud macOS VM
await computer.interface.left_click(100, 200)
await computer.interface.type_text("Hello, world!")
screenshot_bytes = await computer.interface.screenshot()
# Example: Run an agent on the cloud VM
agent = ComputerAgent(
computer=computer,
loop="UITARS",
model=LLM(provider="MLXVLM", name="mlx-community/UI-TARS-1.5-7B-6bit")
)
await agent.run("Find the trycua/cua repository on GitHub and follow the quick start guide")
main()
```
---
#### Local VM (Apple Silicon Only)
```python
from computer import Computer
from agent import ComputerAgent, LLM

View File

@@ -38,7 +38,8 @@ class Computer:
noVNC_port: Optional[int] = 8006,
host: str = os.environ.get("PYLUME_HOST", "localhost"),
storage: Optional[str] = None,
ephemeral: bool = False
ephemeral: bool = False,
api_key: Optional[str] = None
):
"""Initialize a new Computer instance.
@@ -256,9 +257,7 @@ class Computer:
elif self.provider_type == VMProviderType.CLOUD:
self.config.vm_provider = VMProviderFactory.create_provider(
self.provider_type,
port=port,
host=host,
storage=storage,
api_key=self.api_key,
verbose=verbose,
)
else:

View File

@@ -11,90 +11,89 @@ from ..base import BaseVMProvider, VMProviderType
# Setup logging
logger = logging.getLogger(__name__)
import asyncio
import aiohttp
from urllib.parse import urlparse
class CloudProvider(BaseVMProvider):
"""Cloud VM Provider stub implementation.
This is a placeholder for a future cloud VM provider implementation.
"""
"""Cloud VM Provider implementation using /api/vm-host endpoint."""
def __init__(
self,
host: str = "localhost",
port: int = 7777,
storage: Optional[str] = None,
self,
api_key: str = None,
endpoint_url: str = "https://trycua.com/api/vm-host",
verbose: bool = False,
**kwargs,
):
"""Initialize the Cloud provider.
"""
Args:
host: Host to use for API connections (default: localhost)
port: Port for the API server (default: 7777)
storage: Path to store VM data
api_key: API key for authentication
name: Name of the VM
endpoint_url: Endpoint for the VM host API
verbose: Enable verbose logging
"""
self.host = host
self.port = port
self.storage = storage
assert api_key, "api_key required for CloudProvider"
self.api_key = api_key
self.endpoint_url = endpoint_url
self.verbose = verbose
logger.warning("CloudProvider is not yet implemented")
@property
def provider_type(self) -> VMProviderType:
"""Get the provider type."""
return VMProviderType.CLOUD
async def __aenter__(self):
"""Enter async context manager."""
logger.debug("Entering CloudProvider context")
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Exit async context manager."""
logger.debug("Exiting CloudProvider context")
pass
async def get_vm(self, name: str, storage: Optional[str] = None) -> Dict[str, Any]:
"""Get VM information by name."""
logger.warning("CloudProvider.get_vm is not implemented")
return {
"name": name,
"status": "unavailable",
"message": "CloudProvider is not implemented"
}
"""Get VM VNC URL by name using the cloud API."""
headers = {"Authorization": f"Bearer {self.api_key}"}
params = {"vm_name": name}
async with aiohttp.ClientSession() as session:
async with session.get(self.endpoint_url, headers=headers, params=params) as resp:
if resp.status == 200:
vnc_url = (await resp.text()).strip()
parsed = urlparse(vnc_url)
hostname = parsed.hostname
return {"name": vm_name, "status": "available", "vnc_url": vnc_url, "hostname": hostname}
else:
try:
error = await resp.json()
except Exception:
error = {"error": await resp.text()}
return {"name": vm_name, "status": "error", **error}
async def list_vms(self) -> List[Dict[str, Any]]:
"""List all available VMs."""
logger.warning("CloudProvider.list_vms is not implemented")
return []
async def run_vm(self, image: str, name: str, run_opts: Dict[str, Any], storage: Optional[str] = None) -> Dict[str, Any]:
"""Run a VM with the given options."""
logger.warning("CloudProvider.run_vm is not implemented")
return {
"name": name,
"status": "unavailable",
"message": "CloudProvider is not implemented"
}
return {"name": name, "status": "unavailable", "message": "CloudProvider is not implemented"}
async def stop_vm(self, name: str, storage: Optional[str] = None) -> Dict[str, Any]:
"""Stop a running VM."""
logger.warning("CloudProvider.stop_vm is not implemented")
return {
"name": name,
"status": "stopped",
"message": "CloudProvider is not implemented"
}
return {"name": name, "status": "stopped", "message": "CloudProvider is not implemented"}
async def update_vm(self, name: str, update_opts: Dict[str, Any], storage: Optional[str] = None) -> Dict[str, Any]:
"""Update VM configuration."""
logger.warning("CloudProvider.update_vm is not implemented")
return {
"name": name,
"status": "unchanged",
"message": "CloudProvider is not implemented"
}
async def get_ip(self, name: str, storage: Optional[str] = None, retry_delay: int = 2) -> str:
"""Get the IP address of a VM."""
logger.warning("CloudProvider.get_ip is not implemented")
raise NotImplementedError("CloudProvider.get_ip is not implemented")
return {"name": name, "status": "unchanged", "message": "CloudProvider is not implemented"}
async def get_ip(self, name: Optional[str] = None, storage: Optional[str] = None, retry_delay: int = 2) -> str:
"""
Return the VM's IP address as '{vm_name}.us.vms.trycua.com'.
Uses the provided 'name' argument (the VM name requested by the caller).
Retries up to 3 times with retry_delay seconds if hostname is not available.
"""
attempts = 3
last_error = None
for attempt in range(attempts):
result = await self.get_vm(name=name, storage=storage)
hostname = result.get("hostname")
if hostname:
return hostname
last_error = result.get("error") or result
if attempt < attempts - 1:
await asyncio.sleep(retry_delay)
raise RuntimeError(f"Failed to get VM hostname after {attempts} attempts: {last_error}")

View File

@@ -22,7 +22,8 @@ class VMProviderFactory:
image: Optional[str] = None,
verbose: bool = False,
ephemeral: bool = False,
noVNC_port: Optional[int] = None
noVNC_port: Optional[int] = None,
**kwargs,
) -> BaseVMProvider:
"""Create a VM provider of the specified type.
@@ -101,12 +102,13 @@ class VMProviderFactory:
elif provider_type == VMProviderType.CLOUD:
try:
from .cloud import CloudProvider
# Return the stub implementation of CloudProvider
# Extract api_key from kwargs
api_key_final = kwargs.get('api_key')
assert api_key_final, "api_key required for CloudProvider"
return CloudProvider(
host=host,
port=port,
storage=storage,
verbose=verbose
api_key=api_key_final,
verbose=verbose,
**kwargs,
)
except ImportError as e:
logger.error(f"Failed to import CloudProvider: {e}")