From d9cd8154fcc51f01a039264aecd837b606620038 Mon Sep 17 00:00:00 2001 From: Dillon DuPont Date: Mon, 28 Jul 2025 13:47:25 -0400 Subject: [PATCH] Added telemtry_enabled kwarg --- libs/python/agent/agent/__init__.py | 45 ++++ libs/python/agent/agent/agent.py | 19 +- libs/python/agent/agent/callbacks/__init__.py | 2 + .../python/agent/agent/callbacks/telemetry.py | 210 ++++++++++++++++++ libs/python/agent/agent/telemetry.py | 142 ++++++++++++ 5 files changed, 417 insertions(+), 1 deletion(-) create mode 100644 libs/python/agent/agent/callbacks/telemetry.py create mode 100644 libs/python/agent/agent/telemetry.py diff --git a/libs/python/agent/agent/__init__.py b/libs/python/agent/agent/__init__.py index cc29b372..6797dab6 100644 --- a/libs/python/agent/agent/__init__.py +++ b/libs/python/agent/agent/__init__.py @@ -2,6 +2,9 @@ agent - Decorator-based Computer Use Agent with liteLLM integration """ +import logging +import sys + from .decorators import agent_loop from .agent import ComputerAgent from .types import Messages, AgentResponse @@ -17,3 +20,45 @@ __all__ = [ ] __version__ = "0.4.0" + +logger = logging.getLogger(__name__) + +# Initialize telemetry when the package is imported +try: + # Import from core telemetry for basic functions + from core.telemetry import ( + is_telemetry_enabled, + flush, + record_event, + ) + + # Import set_dimension from our own telemetry module + from .telemetry import set_dimension + + # Check if telemetry is enabled + if is_telemetry_enabled(): + logger.info("Telemetry is enabled") + + # Record package initialization + record_event( + "module_init", + { + "module": "agent", + "version": __version__, + "python_version": sys.version, + }, + ) + + # Set the package version as a dimension + set_dimension("agent_version", __version__) + + # Flush events to ensure they're sent + flush() + else: + logger.info("Telemetry is disabled") +except ImportError as e: + # Telemetry not available + logger.warning(f"Telemetry not available: {e}") +except Exception as e: + # Other issues with telemetry + logger.warning(f"Error initializing telemetry: {e}") diff --git a/libs/python/agent/agent/agent.py b/libs/python/agent/agent/agent.py index ba86a632..0b9f243a 100644 --- a/libs/python/agent/agent/agent.py +++ b/libs/python/agent/agent/agent.py @@ -6,6 +6,7 @@ import asyncio from typing import Dict, List, Any, Optional, AsyncGenerator, Union, cast, Callable, Set from litellm.responses.utils import Usage + from .types import Messages, Computer from .decorators import find_agent_loop from .computer_handler import OpenAIComputerHandler, acknowledge_safety_check_callback, check_blocklisted_url @@ -14,7 +15,13 @@ import litellm import litellm.utils import inspect from .adapters import HuggingFaceLocalAdapter -from .callbacks import ImageRetentionCallback, LoggingCallback, TrajectorySaverCallback, BudgetManagerCallback +from .callbacks import ( + ImageRetentionCallback, + LoggingCallback, + TrajectorySaverCallback, + BudgetManagerCallback, + TelemetryCallback, +) def get_json(obj: Any, max_depth: int = 10) -> Any: def custom_serializer(o: Any, depth: int = 0, seen: Set[int] = None) -> Any: @@ -129,6 +136,7 @@ class ComputerAgent: screenshot_delay: Optional[float | int] = 0.5, use_prompt_caching: Optional[bool] = False, max_trajectory_budget: Optional[float | dict] = None, + telemetry_enabled: Optional[bool] = True, **kwargs ): """ @@ -146,6 +154,7 @@ class ComputerAgent: screenshot_delay: Delay before screenshots in seconds use_prompt_caching: If set, use prompt caching to avoid reprocessing the same prompt. Intended for use with anthropic providers. max_trajectory_budget: If set, adds BudgetManagerCallback to track usage costs and stop when budget is exceeded + telemetry_enabled: If set, adds TelemetryCallback to track anonymized usage data. Enabled by default. **kwargs: Additional arguments passed to the agent loop """ self.model = model @@ -158,10 +167,18 @@ class ComputerAgent: self.max_retries = max_retries self.screenshot_delay = screenshot_delay self.use_prompt_caching = use_prompt_caching + self.telemetry_enabled = telemetry_enabled self.kwargs = kwargs # == Add built-in callbacks == + # Add telemetry callback if telemetry_enabled is set + if self.telemetry_enabled: + if isinstance(self.telemetry_enabled, bool): + self.callbacks.append(TelemetryCallback(self)) + else: + self.callbacks.append(TelemetryCallback(self, **self.telemetry_enabled)) + # Add logging callback if verbosity is set if self.verbosity is not None: self.callbacks.append(LoggingCallback(level=self.verbosity)) diff --git a/libs/python/agent/agent/callbacks/__init__.py b/libs/python/agent/agent/callbacks/__init__.py index 6f364b1d..ffe34551 100644 --- a/libs/python/agent/agent/callbacks/__init__.py +++ b/libs/python/agent/agent/callbacks/__init__.py @@ -7,6 +7,7 @@ from .image_retention import ImageRetentionCallback from .logging import LoggingCallback from .trajectory_saver import TrajectorySaverCallback from .budget_manager import BudgetManagerCallback +from .telemetry import TelemetryCallback __all__ = [ "AsyncCallbackHandler", @@ -14,4 +15,5 @@ __all__ = [ "LoggingCallback", "TrajectorySaverCallback", "BudgetManagerCallback", + "TelemetryCallback", ] diff --git a/libs/python/agent/agent/callbacks/telemetry.py b/libs/python/agent/agent/callbacks/telemetry.py new file mode 100644 index 00000000..19e5e6b9 --- /dev/null +++ b/libs/python/agent/agent/callbacks/telemetry.py @@ -0,0 +1,210 @@ +""" +Telemetry callback handler for Computer-Use Agent (cua-agent) +""" + +import time +import uuid +from typing import List, Dict, Any, Optional, Union + +from .base import AsyncCallbackHandler +from ..telemetry import ( + record_event, + is_telemetry_enabled, + set_dimension, + SYSTEM_INFO, +) + + +class TelemetryCallback(AsyncCallbackHandler): + """ + Telemetry callback handler for Computer-Use Agent (cua-agent) + + Tracks agent usage, performance metrics, and optionally trajectory data. + """ + + def __init__( + self, + agent, + log_trajectory: bool = False + ): + """ + Initialize telemetry callback. + + Args: + agent: The ComputerAgent instance + log_trajectory: Whether to log full trajectory items (opt-in) + """ + self.agent = agent + self.log_trajectory = log_trajectory + + # Generate session/run IDs + self.session_id = str(uuid.uuid4()) + self.run_id = None + + # Track timing and metrics + self.run_start_time = None + self.step_count = 0 + self.step_start_time = None + self.total_usage = { + "prompt_tokens": 0, + "completion_tokens": 0, + "total_tokens": 0, + "response_cost": 0.0 + } + + # Record agent initialization + if is_telemetry_enabled(): + self._record_agent_initialization() + + def _record_agent_initialization(self) -> None: + """Record agent type/model and session initialization.""" + agent_info = { + "session_id": self.session_id, + "agent_type": self.agent.__class__.__name__, + "model": getattr(self.agent, 'model', 'unknown'), + **SYSTEM_INFO + } + + # Set session-level dimensions + set_dimension("session_id", self.session_id) + set_dimension("agent_type", agent_info["agent_type"]) + set_dimension("model", agent_info["model"]) + + record_event("agent_session_start", agent_info) + + async def on_run_start(self, kwargs: Dict[str, Any], old_items: List[Dict[str, Any]]) -> None: + """Called at the start of an agent run loop.""" + if not is_telemetry_enabled(): + return + + self.run_id = str(uuid.uuid4()) + self.run_start_time = time.time() + self.step_count = 0 + + # Calculate input context size + input_context_size = self._calculate_context_size(old_items) + + run_data = { + "session_id": self.session_id, + "run_id": self.run_id, + "start_time": self.run_start_time, + "input_context_size": input_context_size, + "num_existing_messages": len(old_items) + } + + # Log trajectory if opted in + if self.log_trajectory: + trajectory = self._extract_trajectory(old_items) + if trajectory: + run_data["uploaded_trajectory"] = trajectory + + set_dimension("run_id", self.run_id) + record_event("agent_run_start", run_data) + + async def on_run_end(self, kwargs: Dict[str, Any], old_items: List[Dict[str, Any]], new_items: List[Dict[str, Any]]) -> None: + """Called at the end of an agent run loop.""" + if not is_telemetry_enabled() or not self.run_start_time: + return + + run_duration = time.time() - self.run_start_time + + run_data = { + "session_id": self.session_id, + "run_id": self.run_id, + "end_time": time.time(), + "duration_seconds": run_duration, + "num_steps": self.step_count, + "total_usage": self.total_usage.copy() + } + + # Log trajectory if opted in + if self.log_trajectory: + trajectory = self._extract_trajectory(new_items) + if trajectory: + run_data["uploaded_trajectory"] = trajectory + + record_event("agent_run_end", run_data) + + async def on_usage(self, usage: Dict[str, Any]) -> None: + """Called when usage information is received.""" + if not is_telemetry_enabled(): + return + + # Accumulate usage stats + self.total_usage["prompt_tokens"] += usage.get("prompt_tokens", 0) + self.total_usage["completion_tokens"] += usage.get("completion_tokens", 0) + self.total_usage["total_tokens"] += usage.get("total_tokens", 0) + self.total_usage["response_cost"] += usage.get("response_cost", 0.0) + + # Record individual usage event + usage_data = { + "session_id": self.session_id, + "run_id": self.run_id, + "step": self.step_count, + **usage + } + + record_event("agent_usage", usage_data) + + async def on_responses(self, kwargs: Dict[str, Any], responses: Dict[str, Any]) -> None: + """Called when responses are received.""" + if not is_telemetry_enabled(): + return + + self.step_count += 1 + step_duration = None + + if self.step_start_time: + step_duration = time.time() - self.step_start_time + + self.step_start_time = time.time() + + step_data = { + "session_id": self.session_id, + "run_id": self.run_id, + "step": self.step_count, + "timestamp": self.step_start_time + } + + if step_duration is not None: + step_data["duration_seconds"] = step_duration + + record_event("agent_step", step_data) + + def _calculate_context_size(self, items: List[Dict[str, Any]]) -> int: + """Calculate approximate context size in tokens/characters.""" + total_size = 0 + + for item in items: + if item.get("type") == "message" and "content" in item: + content = item["content"] + if isinstance(content, str): + total_size += len(content) + elif isinstance(content, list): + for part in content: + if isinstance(part, dict) and "text" in part: + total_size += len(part["text"]) + elif "content" in item and isinstance(item["content"], str): + total_size += len(item["content"]) + + return total_size + + def _extract_trajectory(self, items: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + """Extract trajectory items that should be logged.""" + trajectory = [] + + for item in items: + # Include user messages, assistant messages, reasoning, computer calls, and computer outputs + if ( + item.get("role") == "user" or # User inputs + (item.get("type") == "message" and item.get("role") == "assistant") or # Model outputs + item.get("type") == "reasoning" or # Reasoning traces + item.get("type") == "computer_call" or # Computer actions + item.get("type") == "computer_call_output" # Computer outputs + ): + # Create a copy of the item with timestamp + trajectory_item = item.copy() + trajectory_item["logged_at"] = time.time() + trajectory.append(trajectory_item) + + return trajectory \ No newline at end of file diff --git a/libs/python/agent/agent/telemetry.py b/libs/python/agent/agent/telemetry.py new file mode 100644 index 00000000..d3e33a25 --- /dev/null +++ b/libs/python/agent/agent/telemetry.py @@ -0,0 +1,142 @@ +"""Agent telemetry for tracking anonymous usage and feature usage.""" + +import logging +import os +import platform +import sys +from typing import Dict, Any, Callable + +# Import the core telemetry module +TELEMETRY_AVAILABLE = False + + +# Local fallbacks in case core telemetry isn't available +def _noop(*args: Any, **kwargs: Any) -> None: + """No-op function for when telemetry is not available.""" + pass + + +# Define default functions with unique names to avoid shadowing +_default_record_event = _noop +_default_increment_counter = _noop +_default_set_dimension = _noop +_default_get_telemetry_client = lambda: None +_default_flush = _noop +_default_is_telemetry_enabled = lambda: False +_default_is_telemetry_globally_disabled = lambda: True + +# Set the actual functions to the defaults initially +record_event = _default_record_event +increment_counter = _default_increment_counter +set_dimension = _default_set_dimension +get_telemetry_client = _default_get_telemetry_client +flush = _default_flush +is_telemetry_enabled = _default_is_telemetry_enabled +is_telemetry_globally_disabled = _default_is_telemetry_globally_disabled + +logger = logging.getLogger("agent.telemetry") + +try: + # Import from core telemetry + from core.telemetry import ( + record_event as core_record_event, + increment as core_increment, + get_telemetry_client as core_get_telemetry_client, + flush as core_flush, + is_telemetry_enabled as core_is_telemetry_enabled, + is_telemetry_globally_disabled as core_is_telemetry_globally_disabled, + ) + + # Override the default functions with actual implementations + record_event = core_record_event + get_telemetry_client = core_get_telemetry_client + flush = core_flush + is_telemetry_enabled = core_is_telemetry_enabled + is_telemetry_globally_disabled = core_is_telemetry_globally_disabled + + def increment_counter(counter_name: str, value: int = 1) -> None: + """Wrapper for increment to maintain backward compatibility.""" + if is_telemetry_enabled(): + core_increment(counter_name, value) + + def set_dimension(name: str, value: Any) -> None: + """Set a dimension that will be attached to all events.""" + logger.debug(f"Setting dimension {name}={value}") + + TELEMETRY_AVAILABLE = True + logger.info("Successfully imported telemetry") +except ImportError as e: + logger.warning(f"Could not import telemetry: {e}") + logger.debug("Telemetry not available, using no-op functions") + +# Get system info once to use in telemetry +SYSTEM_INFO = { + "os": platform.system().lower(), + "os_version": platform.release(), + "python_version": platform.python_version(), +} + + +def enable_telemetry() -> bool: + """Enable telemetry if available. + + Returns: + bool: True if telemetry was successfully enabled, False otherwise + """ + global TELEMETRY_AVAILABLE, record_event, increment_counter, get_telemetry_client, flush, is_telemetry_enabled, is_telemetry_globally_disabled + + # Check if globally disabled using core function + if TELEMETRY_AVAILABLE and is_telemetry_globally_disabled(): + logger.info("Telemetry is globally disabled via environment variable - cannot enable") + return False + + # Already enabled + if TELEMETRY_AVAILABLE: + return True + + # Try to import and enable + try: + from core.telemetry import ( + record_event, + increment, + get_telemetry_client, + flush, + is_telemetry_globally_disabled, + ) + + # Check again after import + if is_telemetry_globally_disabled(): + logger.info("Telemetry is globally disabled via environment variable - cannot enable") + return False + + TELEMETRY_AVAILABLE = True + logger.info("Telemetry successfully enabled") + return True + except ImportError as e: + logger.warning(f"Could not enable telemetry: {e}") + return False + + +def is_telemetry_enabled() -> bool: + """Check if telemetry is enabled. + + Returns: + bool: True if telemetry is enabled, False otherwise + """ + # Use the core function if available, otherwise use our local flag + if TELEMETRY_AVAILABLE: + from core.telemetry import is_telemetry_enabled as core_is_enabled + + return core_is_enabled() + return False + + +def record_agent_initialization() -> None: + """Record when an agent instance is initialized.""" + if TELEMETRY_AVAILABLE and is_telemetry_enabled(): + record_event("agent_initialized", SYSTEM_INFO) + + # Set dimensions that will be attached to all events + set_dimension("os", SYSTEM_INFO["os"]) + set_dimension("os_version", SYSTEM_INFO["os_version"]) + set_dimension("python_version", SYSTEM_INFO["python_version"])