Files
computer/examples/som_examples.py
2025-10-22 11:35:31 -07:00

441 lines
17 KiB
Python

#!/usr/bin/env python3
"""
Example script demonstrating the usage of OmniParser's UI element detection functionality.
This script shows how to:
1. Initialize the OmniParser
2. Load and process images
3. Visualize detection results
4. Compare performance between CPU and MPS (Apple Silicon)
"""
import argparse
import base64
import glob
import io
import logging
import os
import sys
import time
from pathlib import Path
from typing import Any, Dict, List, Optional
import numpy as np
from PIL import Image
# Load environment variables from .env file
project_root = Path(__file__).parent.parent
env_file = project_root / ".env"
print(f"Loading environment from: {env_file}")
from dotenv import load_dotenv
load_dotenv(env_file)
# Add paths to sys.path if needed
pythonpath = os.environ.get("PYTHONPATH", "")
for path in pythonpath.split(":"):
if path and path not in sys.path:
sys.path.append(path)
print(f"Added to sys.path: {path}")
# Add the libs directory to the path to find som
libs_path = project_root / "libs"
if str(libs_path) not in sys.path:
sys.path.append(str(libs_path))
print(f"Added to sys.path: {libs_path}")
from som import IconElement, OmniParser, ParseResult, TextElement
from som.models import BoundingBox, ParserMetadata, UIElement
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
logger = logging.getLogger(__name__)
def setup_logging():
"""Configure logging with a nice format."""
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
class Timer:
"""Enhanced context manager for timing code blocks."""
def __init__(self, name: str, logger):
self.name = name
self.logger = logger
self.start_time: float = 0.0
self.elapsed_time: float = 0.0
def __enter__(self):
self.start_time = time.time()
return self
def __exit__(self, *args):
self.elapsed_time = time.time() - self.start_time
self.logger.info(f"{self.name}: {self.elapsed_time:.3f}s")
return False
def image_to_bytes(image: Image.Image) -> bytes:
"""Convert PIL Image to PNG bytes."""
buf = io.BytesIO()
image.save(buf, format="PNG")
return buf.getvalue()
def process_image(
parser: OmniParser, image_path: str, output_dir: Path, use_ocr: bool = False
) -> None:
"""Process a single image and save the result."""
try:
# Load image
logger.info(f"Processing image: {image_path}")
image = Image.open(image_path).convert("RGB")
logger.info(f"Image loaded successfully, size: {image.size}")
# Create output filename
input_filename = Path(image_path).stem
output_path = output_dir / f"{input_filename}_analyzed.png"
# Convert image to PNG bytes
image_bytes = image_to_bytes(image)
# Process image
with Timer(f"Processing {input_filename}", logger):
result = parser.parse(image_bytes, use_ocr=use_ocr)
logger.info(
f"Found {result.metadata.num_icons} icons and {result.metadata.num_text} text elements"
)
# Save the annotated image
logger.info(f"Saving annotated image to: {output_path}")
try:
# Save image from base64
img_data = base64.b64decode(result.annotated_image_base64)
img = Image.open(io.BytesIO(img_data))
img.save(output_path)
# Print detailed results
logger.info("\nDetected Elements:")
for elem in result.elements:
if isinstance(elem, IconElement):
logger.info(
f"Icon: confidence={elem.confidence:.3f}, bbox={elem.bbox.coordinates}"
)
elif isinstance(elem, TextElement):
logger.info(
f"Text: '{elem.content}', confidence={elem.confidence:.3f}, bbox={elem.bbox.coordinates}"
)
# Verify file exists and log size
if output_path.exists():
logger.info(
f"Successfully saved image. File size: {output_path.stat().st_size} bytes"
)
else:
logger.error(f"Failed to verify file at {output_path}")
except Exception as e:
logger.error(f"Error saving image: {str(e)}", exc_info=True)
except Exception as e:
logger.error(f"Error processing image {image_path}: {str(e)}", exc_info=True)
def run_detection_benchmark(
input_path: str,
output_dir: Path,
use_ocr: bool = False,
box_threshold: float = 0.01,
iou_threshold: float = 0.1,
):
"""Run detection benchmark on images."""
logger.info(
f"Starting benchmark with OCR enabled: {use_ocr}, box_threshold: {box_threshold}, iou_threshold: {iou_threshold}"
)
try:
# Initialize parser
logger.info("Initializing OmniParser...")
parser = OmniParser()
# Create output directory
output_dir.mkdir(parents=True, exist_ok=True)
logger.info(f"Output directory created at: {output_dir}")
# Get list of PNG files
if os.path.isdir(input_path):
image_files = glob.glob(os.path.join(input_path, "*.png"))
else:
image_files = [input_path]
logger.info(f"Found {len(image_files)} images to process")
# Process each image with specified thresholds
for image_path in image_files:
try:
# Load image
logger.info(f"Processing image: {image_path}")
image = Image.open(image_path).convert("RGB")
logger.info(f"Image loaded successfully, size: {image.size}")
# Create output filename
input_filename = Path(image_path).stem
output_path = output_dir / f"{input_filename}_analyzed.png"
# Convert image to PNG bytes
image_bytes = image_to_bytes(image)
# Process image with specified thresholds
with Timer(f"Processing {input_filename}", logger):
result = parser.parse(
image_bytes,
use_ocr=use_ocr,
box_threshold=box_threshold,
iou_threshold=iou_threshold,
)
logger.info(
f"Found {result.metadata.num_icons} icons and {result.metadata.num_text} text elements"
)
# Save the annotated image
logger.info(f"Saving annotated image to: {output_path}")
try:
# Save image from base64
img_data = base64.b64decode(result.annotated_image_base64)
img = Image.open(io.BytesIO(img_data))
img.save(output_path)
# Print detailed results
logger.info("\nDetected Elements:")
for elem in result.elements:
if isinstance(elem, IconElement):
logger.info(
f"Icon: confidence={elem.confidence:.3f}, bbox={elem.bbox.coordinates}"
)
elif isinstance(elem, TextElement):
logger.info(
f"Text: '{elem.content}', confidence={elem.confidence:.3f}, bbox={elem.bbox.coordinates}"
)
# Verify file exists and log size
if output_path.exists():
logger.info(
f"Successfully saved image. File size: {output_path.stat().st_size} bytes"
)
else:
logger.error(f"Failed to verify file at {output_path}")
except Exception as e:
logger.error(f"Error saving image: {str(e)}", exc_info=True)
except Exception as e:
logger.error(f"Error processing image {image_path}: {str(e)}", exc_info=True)
except Exception as e:
logger.error(f"Benchmark failed: {str(e)}", exc_info=True)
raise
def run_experiments(input_path: str, output_dir: Path, use_ocr: bool = False):
"""Run experiments with different threshold combinations."""
# Define threshold values to test
box_thresholds = [0.01, 0.05, 0.1, 0.3]
iou_thresholds = [0.05, 0.1, 0.2, 0.5]
logger.info("Starting threshold experiments...")
logger.info("Box thresholds to test: %s", box_thresholds)
logger.info("IOU thresholds to test: %s", iou_thresholds)
# Create results directory for this experiment
timestamp = time.strftime("%Y%m%d-%H%M%S")
ocr_suffix = "_ocr" if use_ocr else "_no_ocr"
exp_dir = output_dir / f"experiment_{timestamp}{ocr_suffix}"
exp_dir.mkdir(parents=True, exist_ok=True)
# Create a summary file
summary_file = exp_dir / "results_summary.txt"
with open(summary_file, "w") as f:
f.write("Threshold Experiments Results\n")
f.write("==========================\n\n")
f.write(f"Input: {input_path}\n")
f.write(f"OCR Enabled: {use_ocr}\n")
f.write(f"Date: {time.strftime('%Y-%m-%d %H:%M:%S')}\n\n")
f.write("Results:\n")
f.write("-" * 80 + "\n")
f.write(
f"{'Box Thresh':^10} | {'IOU Thresh':^10} | {'Num Icons':^10} | {'Num Text':^10} | {'Time (s)':^10}\n"
)
f.write("-" * 80 + "\n")
# Initialize parser once for all experiments
parser = OmniParser()
# Run experiments with each combination
for box_thresh in box_thresholds:
for iou_thresh in iou_thresholds:
logger.info(f"\nTesting box_threshold={box_thresh}, iou_threshold={iou_thresh}")
# Create directory for this combination
combo_dir = exp_dir / f"box_{box_thresh}_iou_{iou_thresh}"
combo_dir.mkdir(exist_ok=True)
try:
# Process each image
if os.path.isdir(input_path):
image_files = glob.glob(os.path.join(input_path, "*.png"))
else:
image_files = [input_path]
total_icons = 0
total_text = 0
total_time = 0
for image_path in image_files:
# Load and process image
image = Image.open(image_path).convert("RGB")
image_bytes = image_to_bytes(image)
# Process with current thresholds
with Timer(f"Processing {Path(image_path).stem}", logger) as t:
result = parser.parse(
image_bytes,
use_ocr=use_ocr,
box_threshold=box_thresh,
iou_threshold=iou_thresh,
)
# Save annotated image
output_path = combo_dir / f"{Path(image_path).stem}_analyzed.png"
img_data = base64.b64decode(result.annotated_image_base64)
img = Image.open(io.BytesIO(img_data))
img.save(output_path)
# Update totals
total_icons += result.metadata.num_icons
total_text += result.metadata.num_text
# Log detailed results
detail_file = combo_dir / f"{Path(image_path).stem}_details.txt"
with open(detail_file, "w") as detail_f:
detail_f.write(f"Results for {Path(image_path).name}\n")
detail_f.write("-" * 40 + "\n")
detail_f.write(f"Number of icons: {result.metadata.num_icons}\n")
detail_f.write(
f"Number of text elements: {result.metadata.num_text}\n\n"
)
detail_f.write("Icon Detections:\n")
icon_count = 1
text_count = (
result.metadata.num_icons + 1
) # Text boxes start after icons
# First list all icons
for elem in result.elements:
if isinstance(elem, IconElement):
detail_f.write(f"Box #{icon_count}: Icon\n")
detail_f.write(f" - Confidence: {elem.confidence:.3f}\n")
detail_f.write(
f" - Coordinates: {elem.bbox.coordinates}\n"
)
icon_count += 1
if use_ocr:
detail_f.write("\nText Detections:\n")
for elem in result.elements:
if isinstance(elem, TextElement):
detail_f.write(f"Box #{text_count}: Text\n")
detail_f.write(f" - Content: '{elem.content}'\n")
detail_f.write(
f" - Confidence: {elem.confidence:.3f}\n"
)
detail_f.write(
f" - Coordinates: {elem.bbox.coordinates}\n"
)
text_count += 1
# Update timing totals
total_time += t.elapsed_time
# Write summary for this combination
avg_time = total_time / len(image_files)
f.write(
f"{box_thresh:^10.3f} | {iou_thresh:^10.3f} | {total_icons:^10d} | {total_text:^10d} | {avg_time:^10.3f}\n"
)
except Exception as e:
logger.error(
f"Error in experiment box={box_thresh}, iou={iou_thresh}: {str(e)}"
)
f.write(
f"{box_thresh:^10.3f} | {iou_thresh:^10.3f} | {'ERROR':^10s} | {'ERROR':^10s} | {'ERROR':^10s}\n"
)
# Write summary footer
f.write("-" * 80 + "\n")
f.write("\nExperiment completed successfully!\n")
logger.info(f"\nExperiment results saved to {exp_dir}")
logger.info(f"Summary file: {summary_file}")
def main():
"""Main entry point."""
parser = argparse.ArgumentParser(description="Run OmniParser benchmark")
parser.add_argument("input_path", help="Path to input image or directory containing images")
parser.add_argument(
"--output-dir", default="examples/output", help="Output directory for annotated images"
)
parser.add_argument(
"--ocr",
choices=["none", "easyocr"],
default="none",
help="OCR engine to use (default: none)",
)
parser.add_argument(
"--mode",
choices=["single", "experiment"],
default="single",
help="Run mode: single run or threshold experiments (default: single)",
)
parser.add_argument(
"--box-threshold",
type=float,
default=0.01,
help="Confidence threshold for detection (default: 0.01)",
)
parser.add_argument(
"--iou-threshold",
type=float,
default=0.1,
help="IOU threshold for Non-Maximum Suppression (default: 0.1)",
)
args = parser.parse_args()
logger.info(f"Starting OmniParser with arguments: {args}")
use_ocr = args.ocr != "none"
output_dir = Path(args.output_dir)
try:
if args.mode == "experiment":
run_experiments(args.input_path, output_dir, use_ocr)
else:
run_detection_benchmark(
args.input_path, output_dir, use_ocr, args.box_threshold, args.iou_threshold
)
except Exception as e:
logger.error(f"Process failed: {str(e)}", exc_info=True)
return 1
return 0
if __name__ == "__main__":
sys.exit(main())