Added locks to websocket interface

This commit is contained in:
Dillon DuPont
2025-06-03 14:50:56 -04:00
parent 89deb8111f
commit e16fb75ce8
2 changed files with 86 additions and 76 deletions

View File

@@ -27,6 +27,7 @@ class LinuxComputerInterface(BaseComputerInterface):
self._max_reconnect_delay = 30 # Maximum delay between reconnection attempts
self._log_connection_attempts = True # Flag to control connection attempt logging
self._authenticated = False # Track authentication status
self._command_lock = asyncio.Lock() # Lock to ensure only one command at a time
# Set logger name for Linux interface
self.logger = Logger("cua.interface.linux", LogLevel.NORMAL)
@@ -193,58 +194,62 @@ class LinuxComputerInterface(BaseComputerInterface):
retry_count = 0
last_error = None
while retry_count < max_retries:
try:
await self._ensure_connection()
if not self._ws:
raise ConnectionError("WebSocket connection is not established")
# Acquire lock to ensure only one command is processed at a time
async with self._command_lock:
self.logger.debug(f"Acquired lock for command: {command}")
while retry_count < max_retries:
try:
await self._ensure_connection()
if not self._ws:
raise ConnectionError("WebSocket connection is not established")
# Handle authentication if needed
if self.api_key and self.vm_name and not self._authenticated:
self.logger.info("Performing authentication handshake...")
auth_message = {
"command": "authenticate",
"params": {
"api_key": self.api_key,
"container_name": self.vm_name
# Handle authentication if needed
if self.api_key and self.vm_name and not self._authenticated:
self.logger.info("Performing authentication handshake...")
auth_message = {
"command": "authenticate",
"params": {
"api_key": self.api_key,
"container_name": self.vm_name
}
}
}
await self._ws.send(json.dumps(auth_message))
# Wait for authentication response
auth_response = await asyncio.wait_for(self._ws.recv(), timeout=10)
auth_result = json.loads(auth_response)
if not auth_result.get("success"):
error_msg = auth_result.get("error", "Authentication failed")
self.logger.error(f"Authentication failed: {error_msg}")
self._authenticated = False
raise ConnectionError(f"Authentication failed: {error_msg}")
self.logger.info("Authentication successful")
self._authenticated = True
await self._ws.send(json.dumps(auth_message))
# Wait for authentication response
auth_response = await asyncio.wait_for(self._ws.recv(), timeout=10)
auth_result = json.loads(auth_response)
if not auth_result.get("success"):
error_msg = auth_result.get("error", "Authentication failed")
self.logger.error(f"Authentication failed: {error_msg}")
self._authenticated = False
raise ConnectionError(f"Authentication failed: {error_msg}")
self.logger.info("Authentication successful")
self._authenticated = True
message = {"command": command, "params": params or {}}
await self._ws.send(json.dumps(message))
response = await asyncio.wait_for(self._ws.recv(), timeout=30)
return json.loads(response)
except Exception as e:
last_error = e
retry_count += 1
if retry_count < max_retries:
# Only log at debug level for intermediate retries
self.logger.debug(
f"Command '{command}' failed (attempt {retry_count}/{max_retries}): {e}"
)
await asyncio.sleep(1)
continue
else:
# Only log at error level for the final failure
self.logger.error(
f"Failed to send command '{command}' after {max_retries} retries"
)
self.logger.debug(f"Command failure details: {e}")
raise last_error if last_error else RuntimeError("Failed to send command")
message = {"command": command, "params": params or {}}
await self._ws.send(json.dumps(message))
response = await asyncio.wait_for(self._ws.recv(), timeout=30)
self.logger.debug(f"Completed command: {command}")
return json.loads(response)
except Exception as e:
last_error = e
retry_count += 1
if retry_count < max_retries:
# Only log at debug level for intermediate retries
self.logger.debug(
f"Command '{command}' failed (attempt {retry_count}/{max_retries}): {e}"
)
await asyncio.sleep(1)
continue
else:
# Only log at error level for the final failure
self.logger.error(
f"Failed to send command '{command}' after {max_retries} retries"
)
self.logger.debug(f"Command failure details: {e}")
raise last_error if last_error else RuntimeError("Failed to send command")
async def wait_for_ready(self, timeout: int = 60, interval: float = 1.0):
"""Wait for WebSocket connection to become available."""

View File

@@ -26,6 +26,7 @@ class MacOSComputerInterface(BaseComputerInterface):
self._reconnect_delay = 1 # Start with 1 second delay
self._max_reconnect_delay = 30 # Maximum delay between reconnection attempts
self._log_connection_attempts = True # Flag to control connection attempt logging
self._command_lock = asyncio.Lock() # Lock to ensure only one command at a time
# Set logger name for macOS interface
self.logger = Logger("cua.interface.macos", LogLevel.NORMAL)
@@ -219,35 +220,39 @@ class MacOSComputerInterface(BaseComputerInterface):
retry_count = 0
last_error = None
while retry_count < max_retries:
try:
await self._ensure_connection()
if not self._ws:
raise ConnectionError("WebSocket connection is not established")
# Acquire lock to ensure only one command is processed at a time
async with self._command_lock:
self.logger.debug(f"Acquired lock for command: {command}")
while retry_count < max_retries:
try:
await self._ensure_connection()
if not self._ws:
raise ConnectionError("WebSocket connection is not established")
message = {"command": command, "params": params or {}}
await self._ws.send(json.dumps(message))
response = await asyncio.wait_for(self._ws.recv(), timeout=30)
return json.loads(response)
except Exception as e:
last_error = e
retry_count += 1
if retry_count < max_retries:
# Only log at debug level for intermediate retries
self.logger.debug(
f"Command '{command}' failed (attempt {retry_count}/{max_retries}): {e}"
)
await asyncio.sleep(1)
continue
else:
# Only log at error level for the final failure
self.logger.error(
f"Failed to send command '{command}' after {max_retries} retries"
)
self.logger.debug(f"Command failure details: {e}")
raise
message = {"command": command, "params": params or {}}
await self._ws.send(json.dumps(message))
response = await asyncio.wait_for(self._ws.recv(), timeout=30)
self.logger.debug(f"Completed command: {command}")
return json.loads(response)
except Exception as e:
last_error = e
retry_count += 1
if retry_count < max_retries:
# Only log at debug level for intermediate retries
self.logger.debug(
f"Command '{command}' failed (attempt {retry_count}/{max_retries}): {e}"
)
await asyncio.sleep(1)
continue
else:
# Only log at error level for the final failure
self.logger.error(
f"Failed to send command '{command}' after {max_retries} retries"
)
self.logger.debug(f"Command failure details: {e}")
raise
raise last_error if last_error else RuntimeError("Failed to send command")
raise last_error if last_error else RuntimeError("Failed to send command")
async def wait_for_ready(self, timeout: int = 60, interval: float = 1.0):
"""Wait for WebSocket connection to become available."""