diff --git a/libs/computer/computer/interface/linux.py b/libs/computer/computer/interface/linux.py index 401730ca..68ba5706 100644 --- a/libs/computer/computer/interface/linux.py +++ b/libs/computer/computer/interface/linux.py @@ -27,6 +27,7 @@ class LinuxComputerInterface(BaseComputerInterface): self._max_reconnect_delay = 30 # Maximum delay between reconnection attempts self._log_connection_attempts = True # Flag to control connection attempt logging self._authenticated = False # Track authentication status + self._command_lock = asyncio.Lock() # Lock to ensure only one command at a time # Set logger name for Linux interface self.logger = Logger("cua.interface.linux", LogLevel.NORMAL) @@ -193,58 +194,62 @@ class LinuxComputerInterface(BaseComputerInterface): retry_count = 0 last_error = None - while retry_count < max_retries: - try: - await self._ensure_connection() - if not self._ws: - raise ConnectionError("WebSocket connection is not established") + # Acquire lock to ensure only one command is processed at a time + async with self._command_lock: + self.logger.debug(f"Acquired lock for command: {command}") + while retry_count < max_retries: + try: + await self._ensure_connection() + if not self._ws: + raise ConnectionError("WebSocket connection is not established") - # Handle authentication if needed - if self.api_key and self.vm_name and not self._authenticated: - self.logger.info("Performing authentication handshake...") - auth_message = { - "command": "authenticate", - "params": { - "api_key": self.api_key, - "container_name": self.vm_name + # Handle authentication if needed + if self.api_key and self.vm_name and not self._authenticated: + self.logger.info("Performing authentication handshake...") + auth_message = { + "command": "authenticate", + "params": { + "api_key": self.api_key, + "container_name": self.vm_name + } } - } - await self._ws.send(json.dumps(auth_message)) - - # Wait for authentication response - auth_response = await asyncio.wait_for(self._ws.recv(), timeout=10) - auth_result = json.loads(auth_response) - - if not auth_result.get("success"): - error_msg = auth_result.get("error", "Authentication failed") - self.logger.error(f"Authentication failed: {error_msg}") - self._authenticated = False - raise ConnectionError(f"Authentication failed: {error_msg}") - - self.logger.info("Authentication successful") - self._authenticated = True + await self._ws.send(json.dumps(auth_message)) + + # Wait for authentication response + auth_response = await asyncio.wait_for(self._ws.recv(), timeout=10) + auth_result = json.loads(auth_response) + + if not auth_result.get("success"): + error_msg = auth_result.get("error", "Authentication failed") + self.logger.error(f"Authentication failed: {error_msg}") + self._authenticated = False + raise ConnectionError(f"Authentication failed: {error_msg}") + + self.logger.info("Authentication successful") + self._authenticated = True - message = {"command": command, "params": params or {}} - await self._ws.send(json.dumps(message)) - response = await asyncio.wait_for(self._ws.recv(), timeout=30) - return json.loads(response) - except Exception as e: - last_error = e - retry_count += 1 - if retry_count < max_retries: - # Only log at debug level for intermediate retries - self.logger.debug( - f"Command '{command}' failed (attempt {retry_count}/{max_retries}): {e}" - ) - await asyncio.sleep(1) - continue - else: - # Only log at error level for the final failure - self.logger.error( - f"Failed to send command '{command}' after {max_retries} retries" - ) - self.logger.debug(f"Command failure details: {e}") - raise last_error if last_error else RuntimeError("Failed to send command") + message = {"command": command, "params": params or {}} + await self._ws.send(json.dumps(message)) + response = await asyncio.wait_for(self._ws.recv(), timeout=30) + self.logger.debug(f"Completed command: {command}") + return json.loads(response) + except Exception as e: + last_error = e + retry_count += 1 + if retry_count < max_retries: + # Only log at debug level for intermediate retries + self.logger.debug( + f"Command '{command}' failed (attempt {retry_count}/{max_retries}): {e}" + ) + await asyncio.sleep(1) + continue + else: + # Only log at error level for the final failure + self.logger.error( + f"Failed to send command '{command}' after {max_retries} retries" + ) + self.logger.debug(f"Command failure details: {e}") + raise last_error if last_error else RuntimeError("Failed to send command") async def wait_for_ready(self, timeout: int = 60, interval: float = 1.0): """Wait for WebSocket connection to become available.""" diff --git a/libs/computer/computer/interface/macos.py b/libs/computer/computer/interface/macos.py index a96c44d1..3daa4fdf 100644 --- a/libs/computer/computer/interface/macos.py +++ b/libs/computer/computer/interface/macos.py @@ -26,6 +26,7 @@ class MacOSComputerInterface(BaseComputerInterface): self._reconnect_delay = 1 # Start with 1 second delay self._max_reconnect_delay = 30 # Maximum delay between reconnection attempts self._log_connection_attempts = True # Flag to control connection attempt logging + self._command_lock = asyncio.Lock() # Lock to ensure only one command at a time # Set logger name for macOS interface self.logger = Logger("cua.interface.macos", LogLevel.NORMAL) @@ -219,35 +220,39 @@ class MacOSComputerInterface(BaseComputerInterface): retry_count = 0 last_error = None - while retry_count < max_retries: - try: - await self._ensure_connection() - if not self._ws: - raise ConnectionError("WebSocket connection is not established") + # Acquire lock to ensure only one command is processed at a time + async with self._command_lock: + self.logger.debug(f"Acquired lock for command: {command}") + while retry_count < max_retries: + try: + await self._ensure_connection() + if not self._ws: + raise ConnectionError("WebSocket connection is not established") - message = {"command": command, "params": params or {}} - await self._ws.send(json.dumps(message)) - response = await asyncio.wait_for(self._ws.recv(), timeout=30) - return json.loads(response) - except Exception as e: - last_error = e - retry_count += 1 - if retry_count < max_retries: - # Only log at debug level for intermediate retries - self.logger.debug( - f"Command '{command}' failed (attempt {retry_count}/{max_retries}): {e}" - ) - await asyncio.sleep(1) - continue - else: - # Only log at error level for the final failure - self.logger.error( - f"Failed to send command '{command}' after {max_retries} retries" - ) - self.logger.debug(f"Command failure details: {e}") - raise + message = {"command": command, "params": params or {}} + await self._ws.send(json.dumps(message)) + response = await asyncio.wait_for(self._ws.recv(), timeout=30) + self.logger.debug(f"Completed command: {command}") + return json.loads(response) + except Exception as e: + last_error = e + retry_count += 1 + if retry_count < max_retries: + # Only log at debug level for intermediate retries + self.logger.debug( + f"Command '{command}' failed (attempt {retry_count}/{max_retries}): {e}" + ) + await asyncio.sleep(1) + continue + else: + # Only log at error level for the final failure + self.logger.error( + f"Failed to send command '{command}' after {max_retries} retries" + ) + self.logger.debug(f"Command failure details: {e}") + raise - raise last_error if last_error else RuntimeError("Failed to send command") + raise last_error if last_error else RuntimeError("Failed to send command") async def wait_for_ready(self, timeout: int = 60, interval: float = 1.0): """Wait for WebSocket connection to become available."""