Added telemtry_enabled kwarg

This commit is contained in:
Dillon DuPont
2025-07-28 13:47:25 -04:00
parent c61a55c986
commit d9cd8154fc
5 changed files with 417 additions and 1 deletions

View File

@@ -2,6 +2,9 @@
agent - Decorator-based Computer Use Agent with liteLLM integration
"""
import logging
import sys
from .decorators import agent_loop
from .agent import ComputerAgent
from .types import Messages, AgentResponse
@@ -17,3 +20,45 @@ __all__ = [
]
__version__ = "0.4.0"
logger = logging.getLogger(__name__)
# Initialize telemetry when the package is imported
try:
# Import from core telemetry for basic functions
from core.telemetry import (
is_telemetry_enabled,
flush,
record_event,
)
# Import set_dimension from our own telemetry module
from .telemetry import set_dimension
# Check if telemetry is enabled
if is_telemetry_enabled():
logger.info("Telemetry is enabled")
# Record package initialization
record_event(
"module_init",
{
"module": "agent",
"version": __version__,
"python_version": sys.version,
},
)
# Set the package version as a dimension
set_dimension("agent_version", __version__)
# Flush events to ensure they're sent
flush()
else:
logger.info("Telemetry is disabled")
except ImportError as e:
# Telemetry not available
logger.warning(f"Telemetry not available: {e}")
except Exception as e:
# Other issues with telemetry
logger.warning(f"Error initializing telemetry: {e}")

View File

@@ -6,6 +6,7 @@ import asyncio
from typing import Dict, List, Any, Optional, AsyncGenerator, Union, cast, Callable, Set
from litellm.responses.utils import Usage
from .types import Messages, Computer
from .decorators import find_agent_loop
from .computer_handler import OpenAIComputerHandler, acknowledge_safety_check_callback, check_blocklisted_url
@@ -14,7 +15,13 @@ import litellm
import litellm.utils
import inspect
from .adapters import HuggingFaceLocalAdapter
from .callbacks import ImageRetentionCallback, LoggingCallback, TrajectorySaverCallback, BudgetManagerCallback
from .callbacks import (
ImageRetentionCallback,
LoggingCallback,
TrajectorySaverCallback,
BudgetManagerCallback,
TelemetryCallback,
)
def get_json(obj: Any, max_depth: int = 10) -> Any:
def custom_serializer(o: Any, depth: int = 0, seen: Set[int] = None) -> Any:
@@ -129,6 +136,7 @@ class ComputerAgent:
screenshot_delay: Optional[float | int] = 0.5,
use_prompt_caching: Optional[bool] = False,
max_trajectory_budget: Optional[float | dict] = None,
telemetry_enabled: Optional[bool] = True,
**kwargs
):
"""
@@ -146,6 +154,7 @@ class ComputerAgent:
screenshot_delay: Delay before screenshots in seconds
use_prompt_caching: If set, use prompt caching to avoid reprocessing the same prompt. Intended for use with anthropic providers.
max_trajectory_budget: If set, adds BudgetManagerCallback to track usage costs and stop when budget is exceeded
telemetry_enabled: If set, adds TelemetryCallback to track anonymized usage data. Enabled by default.
**kwargs: Additional arguments passed to the agent loop
"""
self.model = model
@@ -158,10 +167,18 @@ class ComputerAgent:
self.max_retries = max_retries
self.screenshot_delay = screenshot_delay
self.use_prompt_caching = use_prompt_caching
self.telemetry_enabled = telemetry_enabled
self.kwargs = kwargs
# == Add built-in callbacks ==
# Add telemetry callback if telemetry_enabled is set
if self.telemetry_enabled:
if isinstance(self.telemetry_enabled, bool):
self.callbacks.append(TelemetryCallback(self))
else:
self.callbacks.append(TelemetryCallback(self, **self.telemetry_enabled))
# Add logging callback if verbosity is set
if self.verbosity is not None:
self.callbacks.append(LoggingCallback(level=self.verbosity))

View File

@@ -7,6 +7,7 @@ from .image_retention import ImageRetentionCallback
from .logging import LoggingCallback
from .trajectory_saver import TrajectorySaverCallback
from .budget_manager import BudgetManagerCallback
from .telemetry import TelemetryCallback
__all__ = [
"AsyncCallbackHandler",
@@ -14,4 +15,5 @@ __all__ = [
"LoggingCallback",
"TrajectorySaverCallback",
"BudgetManagerCallback",
"TelemetryCallback",
]

View File

@@ -0,0 +1,210 @@
"""
Telemetry callback handler for Computer-Use Agent (cua-agent)
"""
import time
import uuid
from typing import List, Dict, Any, Optional, Union
from .base import AsyncCallbackHandler
from ..telemetry import (
record_event,
is_telemetry_enabled,
set_dimension,
SYSTEM_INFO,
)
class TelemetryCallback(AsyncCallbackHandler):
"""
Telemetry callback handler for Computer-Use Agent (cua-agent)
Tracks agent usage, performance metrics, and optionally trajectory data.
"""
def __init__(
self,
agent,
log_trajectory: bool = False
):
"""
Initialize telemetry callback.
Args:
agent: The ComputerAgent instance
log_trajectory: Whether to log full trajectory items (opt-in)
"""
self.agent = agent
self.log_trajectory = log_trajectory
# Generate session/run IDs
self.session_id = str(uuid.uuid4())
self.run_id = None
# Track timing and metrics
self.run_start_time = None
self.step_count = 0
self.step_start_time = None
self.total_usage = {
"prompt_tokens": 0,
"completion_tokens": 0,
"total_tokens": 0,
"response_cost": 0.0
}
# Record agent initialization
if is_telemetry_enabled():
self._record_agent_initialization()
def _record_agent_initialization(self) -> None:
"""Record agent type/model and session initialization."""
agent_info = {
"session_id": self.session_id,
"agent_type": self.agent.__class__.__name__,
"model": getattr(self.agent, 'model', 'unknown'),
**SYSTEM_INFO
}
# Set session-level dimensions
set_dimension("session_id", self.session_id)
set_dimension("agent_type", agent_info["agent_type"])
set_dimension("model", agent_info["model"])
record_event("agent_session_start", agent_info)
async def on_run_start(self, kwargs: Dict[str, Any], old_items: List[Dict[str, Any]]) -> None:
"""Called at the start of an agent run loop."""
if not is_telemetry_enabled():
return
self.run_id = str(uuid.uuid4())
self.run_start_time = time.time()
self.step_count = 0
# Calculate input context size
input_context_size = self._calculate_context_size(old_items)
run_data = {
"session_id": self.session_id,
"run_id": self.run_id,
"start_time": self.run_start_time,
"input_context_size": input_context_size,
"num_existing_messages": len(old_items)
}
# Log trajectory if opted in
if self.log_trajectory:
trajectory = self._extract_trajectory(old_items)
if trajectory:
run_data["uploaded_trajectory"] = trajectory
set_dimension("run_id", self.run_id)
record_event("agent_run_start", run_data)
async def on_run_end(self, kwargs: Dict[str, Any], old_items: List[Dict[str, Any]], new_items: List[Dict[str, Any]]) -> None:
"""Called at the end of an agent run loop."""
if not is_telemetry_enabled() or not self.run_start_time:
return
run_duration = time.time() - self.run_start_time
run_data = {
"session_id": self.session_id,
"run_id": self.run_id,
"end_time": time.time(),
"duration_seconds": run_duration,
"num_steps": self.step_count,
"total_usage": self.total_usage.copy()
}
# Log trajectory if opted in
if self.log_trajectory:
trajectory = self._extract_trajectory(new_items)
if trajectory:
run_data["uploaded_trajectory"] = trajectory
record_event("agent_run_end", run_data)
async def on_usage(self, usage: Dict[str, Any]) -> None:
"""Called when usage information is received."""
if not is_telemetry_enabled():
return
# Accumulate usage stats
self.total_usage["prompt_tokens"] += usage.get("prompt_tokens", 0)
self.total_usage["completion_tokens"] += usage.get("completion_tokens", 0)
self.total_usage["total_tokens"] += usage.get("total_tokens", 0)
self.total_usage["response_cost"] += usage.get("response_cost", 0.0)
# Record individual usage event
usage_data = {
"session_id": self.session_id,
"run_id": self.run_id,
"step": self.step_count,
**usage
}
record_event("agent_usage", usage_data)
async def on_responses(self, kwargs: Dict[str, Any], responses: Dict[str, Any]) -> None:
"""Called when responses are received."""
if not is_telemetry_enabled():
return
self.step_count += 1
step_duration = None
if self.step_start_time:
step_duration = time.time() - self.step_start_time
self.step_start_time = time.time()
step_data = {
"session_id": self.session_id,
"run_id": self.run_id,
"step": self.step_count,
"timestamp": self.step_start_time
}
if step_duration is not None:
step_data["duration_seconds"] = step_duration
record_event("agent_step", step_data)
def _calculate_context_size(self, items: List[Dict[str, Any]]) -> int:
"""Calculate approximate context size in tokens/characters."""
total_size = 0
for item in items:
if item.get("type") == "message" and "content" in item:
content = item["content"]
if isinstance(content, str):
total_size += len(content)
elif isinstance(content, list):
for part in content:
if isinstance(part, dict) and "text" in part:
total_size += len(part["text"])
elif "content" in item and isinstance(item["content"], str):
total_size += len(item["content"])
return total_size
def _extract_trajectory(self, items: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Extract trajectory items that should be logged."""
trajectory = []
for item in items:
# Include user messages, assistant messages, reasoning, computer calls, and computer outputs
if (
item.get("role") == "user" or # User inputs
(item.get("type") == "message" and item.get("role") == "assistant") or # Model outputs
item.get("type") == "reasoning" or # Reasoning traces
item.get("type") == "computer_call" or # Computer actions
item.get("type") == "computer_call_output" # Computer outputs
):
# Create a copy of the item with timestamp
trajectory_item = item.copy()
trajectory_item["logged_at"] = time.time()
trajectory.append(trajectory_item)
return trajectory

View File

@@ -0,0 +1,142 @@
"""Agent telemetry for tracking anonymous usage and feature usage."""
import logging
import os
import platform
import sys
from typing import Dict, Any, Callable
# Import the core telemetry module
TELEMETRY_AVAILABLE = False
# Local fallbacks in case core telemetry isn't available
def _noop(*args: Any, **kwargs: Any) -> None:
"""No-op function for when telemetry is not available."""
pass
# Define default functions with unique names to avoid shadowing
_default_record_event = _noop
_default_increment_counter = _noop
_default_set_dimension = _noop
_default_get_telemetry_client = lambda: None
_default_flush = _noop
_default_is_telemetry_enabled = lambda: False
_default_is_telemetry_globally_disabled = lambda: True
# Set the actual functions to the defaults initially
record_event = _default_record_event
increment_counter = _default_increment_counter
set_dimension = _default_set_dimension
get_telemetry_client = _default_get_telemetry_client
flush = _default_flush
is_telemetry_enabled = _default_is_telemetry_enabled
is_telemetry_globally_disabled = _default_is_telemetry_globally_disabled
logger = logging.getLogger("agent.telemetry")
try:
# Import from core telemetry
from core.telemetry import (
record_event as core_record_event,
increment as core_increment,
get_telemetry_client as core_get_telemetry_client,
flush as core_flush,
is_telemetry_enabled as core_is_telemetry_enabled,
is_telemetry_globally_disabled as core_is_telemetry_globally_disabled,
)
# Override the default functions with actual implementations
record_event = core_record_event
get_telemetry_client = core_get_telemetry_client
flush = core_flush
is_telemetry_enabled = core_is_telemetry_enabled
is_telemetry_globally_disabled = core_is_telemetry_globally_disabled
def increment_counter(counter_name: str, value: int = 1) -> None:
"""Wrapper for increment to maintain backward compatibility."""
if is_telemetry_enabled():
core_increment(counter_name, value)
def set_dimension(name: str, value: Any) -> None:
"""Set a dimension that will be attached to all events."""
logger.debug(f"Setting dimension {name}={value}")
TELEMETRY_AVAILABLE = True
logger.info("Successfully imported telemetry")
except ImportError as e:
logger.warning(f"Could not import telemetry: {e}")
logger.debug("Telemetry not available, using no-op functions")
# Get system info once to use in telemetry
SYSTEM_INFO = {
"os": platform.system().lower(),
"os_version": platform.release(),
"python_version": platform.python_version(),
}
def enable_telemetry() -> bool:
"""Enable telemetry if available.
Returns:
bool: True if telemetry was successfully enabled, False otherwise
"""
global TELEMETRY_AVAILABLE, record_event, increment_counter, get_telemetry_client, flush, is_telemetry_enabled, is_telemetry_globally_disabled
# Check if globally disabled using core function
if TELEMETRY_AVAILABLE and is_telemetry_globally_disabled():
logger.info("Telemetry is globally disabled via environment variable - cannot enable")
return False
# Already enabled
if TELEMETRY_AVAILABLE:
return True
# Try to import and enable
try:
from core.telemetry import (
record_event,
increment,
get_telemetry_client,
flush,
is_telemetry_globally_disabled,
)
# Check again after import
if is_telemetry_globally_disabled():
logger.info("Telemetry is globally disabled via environment variable - cannot enable")
return False
TELEMETRY_AVAILABLE = True
logger.info("Telemetry successfully enabled")
return True
except ImportError as e:
logger.warning(f"Could not enable telemetry: {e}")
return False
def is_telemetry_enabled() -> bool:
"""Check if telemetry is enabled.
Returns:
bool: True if telemetry is enabled, False otherwise
"""
# Use the core function if available, otherwise use our local flag
if TELEMETRY_AVAILABLE:
from core.telemetry import is_telemetry_enabled as core_is_enabled
return core_is_enabled()
return False
def record_agent_initialization() -> None:
"""Record when an agent instance is initialized."""
if TELEMETRY_AVAILABLE and is_telemetry_enabled():
record_event("agent_initialized", SYSTEM_INFO)
# Set dimensions that will be attached to all events
set_dimension("os", SYSTEM_INFO["os"])
set_dimension("os_version", SYSTEM_INFO["os_version"])
set_dimension("python_version", SYSTEM_INFO["python_version"])