Fix Pylance lint

This commit is contained in:
f-trycua
2025-03-23 14:39:52 +01:00
parent 46fd3a0c7c
commit cc3891a7ad
8 changed files with 85 additions and 76 deletions

View File

@@ -32,10 +32,10 @@ async def run_omni_agent_example():
# Create agent with loop and provider
agent = ComputerAgent(
computer=computer,
# loop=AgentLoop.ANTHROPIC,
loop=AgentLoop.OMNI,
model=LLM(provider=LLMProvider.OPENAI, name="gpt-4.5-preview"),
# model=LLM(provider=LLMProvider.ANTHROPIC, name="claude-3-7-sonnet-20250219"),
loop=AgentLoop.ANTHROPIC,
# loop=AgentLoop.OMNI,
# model=LLM(provider=LLMProvider.OPENAI, name="gpt-4.5-preview"),
model=LLM(provider=LLMProvider.ANTHROPIC, name="claude-3-7-sonnet-20250219"),
save_trajectory=True,
trajectory_dir=str(Path("trajectories")),
only_n_most_recent_images=3,

View File

@@ -1,12 +1,6 @@
import asyncio
from pylume import (
PyLume,
ImageRef,
VMRunOpts,
SharedDirectory,
VMConfig,
VMUpdateOpts
)
from pylume import PyLume, ImageRef, VMRunOpts, SharedDirectory, VMConfig, VMUpdateOpts
async def main():
"""Example usage of PyLume."""
@@ -22,11 +16,11 @@ async def main():
vm_config = VMConfig(
name="lume-vm-new",
os="macOS",
cpu=2,
cpu=2,
memory="4GB",
disk_size="64GB",
disk_size="64GB", # type: ignore
display="1024x768",
ipsw="latest"
ipsw="latest",
)
await pylume.create_vm(vm_config)
@@ -39,7 +33,7 @@ async def main():
print("\n=== Listing Available Images ===")
images = await pylume.get_images()
print("Available Images:", images)
# List all VMs to verify creation
print("\n=== Listing All VMs ===")
vms = await pylume.list_vms()
@@ -52,30 +46,21 @@ async def main():
# Update VM settings
print("\n=== Updating VM Settings ===")
update_opts = VMUpdateOpts(
cpu=8,
memory="4GB"
)
update_opts = VMUpdateOpts(cpu=8, memory="4GB")
await pylume.update_vm("lume-vm", update_opts)
# Pull an image
# Pull an image
image_ref = ImageRef(
image="macos-sequoia-vanilla",
tag="latest",
registry="ghcr.io",
organization="trycua"
image="macos-sequoia-vanilla", tag="latest", registry="ghcr.io", organization="trycua"
)
await pylume.pull_image(image_ref, name="lume-vm-pulled")
# Run with shared directory
run_opts = VMRunOpts(
no_display=False,
shared_directories=[
SharedDirectory(
host_path="~/shared",
read_only=False
)
]
no_display=False, # type: ignore
shared_directories=[ # type: ignore
SharedDirectory(host_path="~/shared", read_only=False) # type: ignore
],
)
await pylume.run_vm("lume-vm", run_opts)
@@ -94,5 +79,6 @@ async def main():
print("\n=== Deleting VM ===")
await pylume.delete_vm("lume-vm-cloned")
if __name__ == '__main__':
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -74,31 +74,30 @@ class _BashSession:
async with asyncio.timeout(self._timeout):
while True:
await asyncio.sleep(self._output_delay)
# if we read directly from stdout/stderr, it will wait forever for
# EOF. use the StreamReader buffer directly instead.
output = (
self._process.stdout._buffer.decode()
) # pyright: ignore[reportAttributeAccessIssue]
if self._sentinel in output:
# strip the sentinel and break
output = output[: output.index(self._sentinel)]
break
# Read from stdout using the proper API
output_bytes = await self._process.stdout.read()
if output_bytes:
output = output_bytes.decode()
if self._sentinel in output:
# strip the sentinel and break
output = output[: output.index(self._sentinel)]
break
except asyncio.TimeoutError:
self._timed_out = True
raise ToolError(
f"timed out: bash has not returned in {self._timeout} seconds and must be restarted",
) from None
if output.endswith("\n"):
if output and output.endswith("\n"):
output = output[:-1]
error = self._process.stderr._buffer.decode() # pyright: ignore[reportAttributeAccessIssue]
if error.endswith("\n"):
# Read from stderr using the proper API
error_bytes = await self._process.stderr.read()
error = error_bytes.decode() if error_bytes else ""
if error and error.endswith("\n"):
error = error[:-1]
# clear the buffers so that the next output can be read correctly
self._process.stdout._buffer.clear() # pyright: ignore[reportAttributeAccessIssue]
self._process.stderr._buffer.clear() # pyright: ignore[reportAttributeAccessIssue]
# No need to clear buffers as we're using read() which consumes the data
return CLIResult(output=output, error=error)

View File

@@ -11,10 +11,6 @@ class LLMProvider(StrEnum):
ANTHROPIC = "anthropic"
OPENAI = "openai"
LLMProvider
@dataclass
class LLM:
"""Configuration for LLM model and provider."""

View File

@@ -5,9 +5,7 @@ from pylume.models import (
VMUpdateOpts,
ImageRef,
SharedDirectory,
VMStatus,
VMConfig,
CloneSpec,
VMStatus
)
import asyncio
from .models import Computer as ComputerConfig, Display
@@ -15,7 +13,6 @@ from .interface.factory import InterfaceFactory
import time
from PIL import Image
import io
from .utils import bytes_to_image
import re
from .logger import Logger, LogLevel
import json
@@ -266,7 +263,8 @@ class Computer:
# Log the equivalent curl command for debugging
payload = json.dumps({"noDisplay": False, "sharedDirectories": []})
curl_cmd = f"curl -X POST 'http://localhost:3000/lume/vms/{self.config.name}/run' -H 'Content-Type: application/json' -d '{payload}'"
print(f"\nEquivalent curl command:\n{curl_cmd}\n")
self.logger.info(f"Equivalent curl command:")
self.logger.info(f"{curl_cmd}")
try:
response = await self.config.pylume.run_vm(self.config.name, run_opts) # type: ignore[attr-defined]

View File

@@ -51,6 +51,7 @@ def process_text_box(box, image):
"""Process a single text box with OCR."""
try:
import easyocr
from typing import List, Tuple, Any, Sequence
x1 = int(min(point[0] for point in box))
y1 = int(min(point[1] for point in box))
@@ -67,12 +68,15 @@ def process_text_box(box, image):
region = image[y1:y2, x1:x2]
if region.size > 0:
reader = easyocr.Reader(["en"])
result = reader.readtext(region)
if result:
text = result[0][1] # Get text
conf = result[0][2] # Get confidence
if conf > 0.5:
return text, [x1, y1, x2, y2], conf
results = reader.readtext(region)
if results and len(results) > 0:
# EasyOCR returns a list of tuples (bbox, text, confidence)
first_result = results[0]
if isinstance(first_result, (list, tuple)) and len(first_result) >= 3:
text = str(first_result[1])
confidence = float(first_result[2])
if confidence > 0.5:
return text, [x1, y1, x2, y2], confidence
except Exception:
pass
return None
@@ -90,6 +94,10 @@ def check_ocr_box(image_path: Union[str, Path]) -> Tuple[List[str], List[List[fl
logger.error(f"Failed to read image: {image_path}")
return [], []
# Get image dimensions
img_height, img_width = image_cv.shape[:2]
confidence_threshold = 0.5
# Use EasyOCR
import ssl
import easyocr
@@ -120,7 +128,7 @@ def check_ocr_box(image_path: Union[str, Path]) -> Tuple[List[str], List[List[fl
x2 = max(point[0] for point in box)
y2 = max(point[1] for point in box)
if conf > 0.5: # Only keep higher confidence detections
if float(conf) > 0.5: # Only keep higher confidence detections
texts.append(text)
boxes.append([x1, y1, x2, y2])

View File

@@ -1,4 +1,4 @@
from typing import List, Dict, Any, Tuple
from typing import List, Dict, Any, Tuple, Union
import logging
import signal
from contextlib import contextmanager
@@ -137,14 +137,17 @@ class OCRProcessor:
img_width, img_height = image.size
for box, text, conf in results:
if conf < confidence_threshold:
# Ensure conf is float
conf_float = float(conf)
if conf_float < confidence_threshold:
continue
# Convert box format to [x1, y1, x2, y2]
x1 = min(point[0] for point in box) / img_width
y1 = min(point[1] for point in box) / img_height
x2 = max(point[0] for point in box) / img_width
y2 = max(point[1] for point in box) / img_height
# Ensure box points are properly typed as float
x1 = min(float(point[0]) for point in box) / img_width
y1 = min(float(point[1]) for point in box) / img_height
x2 = max(float(point[0]) for point in box) / img_width
y2 = max(float(point[1]) for point in box) / img_height
detections.append(
{

View File

@@ -3,7 +3,7 @@ import cv2
import matplotlib.pyplot as plt
import numpy as np
from PIL import Image
from typing import Union
from typing import Union, List, Tuple, Any, Optional, cast, Sequence
import time
import signal
from contextlib import contextmanager
@@ -56,7 +56,7 @@ def check_ocr_box(
goal_filtering=None,
easyocr_args=None,
use_paddleocr=False,
):
) -> Tuple[Tuple[List[str], List[Tuple[float, float, float, float]]], Optional[Any]]:
"""Check OCR box using EasyOCR with optimized settings.
Args:
@@ -66,6 +66,11 @@ def check_ocr_box(
goal_filtering: Optional filtering of results
easyocr_args: Arguments for EasyOCR
use_paddleocr: Ignored (kept for backward compatibility)
Returns:
Tuple containing:
- Tuple of (text_list, bounding_boxes)
- goal_filtering value
"""
logger.info("Starting OCR processing...")
start_time = time.time()
@@ -104,9 +109,11 @@ def check_ocr_box(
# Use EasyOCR with timeout
logger.info("Starting EasyOCR detection with 5 second timeout...")
with timeout(5): # 5 second timeout
result = reader.readtext(image_np, **default_args)
coord = [item[0] for item in result]
text = [item[1] for item in result]
# EasyOCR's readtext returns a list of tuples, where each tuple is (bbox, text, confidence)
raw_result = reader.readtext(image_np, **default_args)
result = cast(Sequence[Tuple[List[Tuple[float, float]], str, float]], raw_result)
coord = [item[0] for item in result] # item[0] is the bbox coordinates
text = [item[1] for item in result] # item[1] is the text content
logger.info(f"OCR completed successfully. Found {len(text)} text regions")
logger.info(f"Detected text: {text}")
@@ -129,7 +136,16 @@ def check_ocr_box(
for item in coord:
x, y, a, b = get_xywh(item)
bb.append((x, y, a, b))
cv2.rectangle(opencv_img, (x, y), (x + a, y + b), (0, 255, 0), 2)
# Convert float coordinates to integers for cv2.rectangle
x_val = cast(float, x)
y_val = cast(float, y)
a_val = cast(float, a)
b_val = cast(float, b)
x_int, y_int = int(x_val), int(y_val)
a_int, b_int = int(a_val), int(b_val)
cv2.rectangle(
opencv_img, (x_int, y_int), (x_int + a_int, y_int + b_int), (0, 255, 0), 2
)
plt.imshow(cv2.cvtColor(opencv_img, cv2.COLOR_BGR2RGB))
else:
if output_bb_format == "xywh":
@@ -137,6 +153,9 @@ def check_ocr_box(
elif output_bb_format == "xyxy":
bb = [get_xyxy(item) for item in coord]
# Cast the bounding boxes to the expected type
bb = cast(List[Tuple[float, float, float, float]], bb)
logger.info("OCR processing complete")
return (text, bb), goal_filtering