Merge pull request #336 from jamesmurdza/fix/refactor-python-telemetry

Refactor Python telemetry library
This commit is contained in:
James Murdza
2025-08-18 15:31:45 -04:00
committed by GitHub
17 changed files with 176 additions and 1102 deletions

View File

@@ -3,27 +3,15 @@
It provides a low-overhead way to collect anonymous usage data.
"""
from core.telemetry.telemetry import (
UniversalTelemetryClient,
enable_telemetry,
disable_telemetry,
flush,
get_telemetry_client,
increment,
from core.telemetry.posthog import (
record_event,
is_telemetry_enabled,
is_telemetry_globally_disabled,
destroy_telemetry_client,
)
__all__ = [
"UniversalTelemetryClient",
"enable_telemetry",
"disable_telemetry",
"flush",
"get_telemetry_client",
"increment",
"record_event",
"is_telemetry_enabled",
"is_telemetry_globally_disabled",
"destroy_telemetry_client",
]

View File

@@ -1,233 +0,0 @@
"""Telemetry client for collecting anonymous usage data."""
from __future__ import annotations
import json
import logging
import os
import random
import time
import uuid
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Dict, List, Optional
from core import __version__
from core.telemetry.sender import send_telemetry
logger = logging.getLogger("core.telemetry")
# Controls how frequently telemetry will be sent (percentage)
TELEMETRY_SAMPLE_RATE = 5 # 5% sampling rate
@dataclass
class TelemetryConfig:
"""Configuration for telemetry collection."""
enabled: bool = False # Default to opt-in
sample_rate: float = TELEMETRY_SAMPLE_RATE
project_root: Optional[Path] = None
@classmethod
def from_env(cls, project_root: Optional[Path] = None) -> TelemetryConfig:
"""Load config from environment variables."""
# CUA_TELEMETRY should be set to "on" to enable telemetry (opt-in)
return cls(
enabled=os.environ.get("CUA_TELEMETRY", "").lower() == "on",
sample_rate=float(os.environ.get("CUA_TELEMETRY_SAMPLE_RATE", TELEMETRY_SAMPLE_RATE)),
project_root=project_root,
)
def to_dict(self) -> Dict[str, Any]:
"""Convert config to dictionary."""
return {
"enabled": self.enabled,
"sample_rate": self.sample_rate,
}
class TelemetryClient:
"""Collects and reports telemetry data with transparency and sampling."""
def __init__(
self, project_root: Optional[Path] = None, config: Optional[TelemetryConfig] = None
):
"""Initialize telemetry client.
Args:
project_root: Root directory of the project
config: Telemetry configuration, or None to load from environment
"""
self.config = config or TelemetryConfig.from_env(project_root)
self.installation_id = self._get_or_create_installation_id()
self.counters: Dict[str, int] = {}
self.events: List[Dict[str, Any]] = []
self.start_time = time.time()
# Log telemetry status on startup
if self.config.enabled:
logger.info(f"Telemetry enabled (sampling at {self.config.sample_rate}%)")
else:
logger.info("Telemetry disabled")
# Create .cua directory if it doesn't exist and config is provided
if self.config.project_root:
self._setup_local_storage()
def _get_or_create_installation_id(self) -> str:
"""Get or create a random installation ID.
This ID is not tied to any personal information.
"""
if self.config.project_root:
id_file = self.config.project_root / ".cua" / "installation_id"
if id_file.exists():
try:
return id_file.read_text().strip()
except Exception:
pass
# Create new ID if not exists
new_id = str(uuid.uuid4())
try:
id_file.parent.mkdir(parents=True, exist_ok=True)
id_file.write_text(new_id)
return new_id
except Exception:
pass
# Fallback to in-memory ID if file operations fail
return str(uuid.uuid4())
def _setup_local_storage(self) -> None:
"""Create local storage directories and files."""
if not self.config.project_root:
return
cua_dir = self.config.project_root / ".cua"
cua_dir.mkdir(parents=True, exist_ok=True)
# Store telemetry config
config_path = cua_dir / "telemetry_config.json"
with open(config_path, "w") as f:
json.dump(self.config.to_dict(), f)
def increment(self, counter_name: str, value: int = 1) -> None:
"""Increment a named counter.
Args:
counter_name: Name of the counter
value: Amount to increment by (default: 1)
"""
if not self.config.enabled:
return
if counter_name not in self.counters:
self.counters[counter_name] = 0
self.counters[counter_name] += value
def record_event(self, event_name: str, properties: Optional[Dict[str, Any]] = None) -> None:
"""Record an event with optional properties.
Args:
event_name: Name of the event
properties: Event properties (must not contain sensitive data)
"""
if not self.config.enabled:
return
# Increment counter for this event type
counter_key = f"event:{event_name}"
self.increment(counter_key)
# Record event details for deeper analysis (if sampled)
if properties and random.random() * 100 <= self.config.sample_rate:
self.events.append(
{"name": event_name, "properties": properties, "timestamp": time.time()}
)
def flush(self) -> bool:
"""Send collected telemetry if sampling criteria is met.
Returns:
bool: True if telemetry was sent, False otherwise
"""
if not self.config.enabled or (not self.counters and not self.events):
return False
# Apply sampling - only send data for a percentage of installations
if random.random() * 100 > self.config.sample_rate:
logger.debug("Telemetry sampled out")
self.counters.clear()
self.events.clear()
return False
# Prepare telemetry payload
payload = {
"version": __version__,
"installation_id": self.installation_id,
"counters": self.counters.copy(),
"events": self.events.copy(),
"duration": time.time() - self.start_time,
"timestamp": time.time(),
}
try:
# Send telemetry data
success = send_telemetry(payload)
if success:
logger.debug(
f"Telemetry sent: {len(self.counters)} counters, {len(self.events)} events"
)
else:
logger.debug("Failed to send telemetry")
return success
except Exception as e:
logger.debug(f"Failed to send telemetry: {e}")
return False
finally:
# Clear data after sending
self.counters.clear()
self.events.clear()
def enable(self) -> None:
"""Enable telemetry collection."""
self.config.enabled = True
logger.info("Telemetry enabled")
if self.config.project_root:
self._setup_local_storage()
def disable(self) -> None:
"""Disable telemetry collection."""
self.config.enabled = False
logger.info("Telemetry disabled")
if self.config.project_root:
self._setup_local_storage()
# Global telemetry client instance
_client: Optional[TelemetryClient] = None
def get_telemetry_client(project_root: Optional[Path] = None) -> TelemetryClient:
"""Get or initialize the global telemetry client.
Args:
project_root: Root directory of the project
Returns:
The global telemetry client instance
"""
global _client
if _client is None:
_client = TelemetryClient(project_root)
return _client
def disable_telemetry() -> None:
"""Disable telemetry collection globally."""
if _client is not None:
_client.disable()

View File

@@ -1,37 +0,0 @@
"""Models for telemetry data."""
from __future__ import annotations
from datetime import datetime
from typing import Any, Dict, List, Optional
from pydantic import BaseModel, Field
class TelemetryEvent(BaseModel):
"""A telemetry event with properties."""
name: str
properties: Dict[str, Any] = Field(default_factory=dict)
timestamp: float = Field(default_factory=lambda: datetime.now().timestamp())
class TelemetryPayload(BaseModel):
"""Telemetry payload sent to the server."""
version: str
installation_id: str
counters: Dict[str, int] = Field(default_factory=dict)
events: List[TelemetryEvent] = Field(default_factory=list)
duration: float = 0
timestamp: float = Field(default_factory=lambda: datetime.now().timestamp())
class UserRecord(BaseModel):
"""User record stored in the telemetry database."""
id: str
version: Optional[str] = None
created_at: Optional[datetime] = None
last_seen_at: Optional[datetime] = None
is_ci: bool = False

View File

@@ -2,14 +2,10 @@
from __future__ import annotations
import json
import logging
import os
import random
import time
import uuid
import sys
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict, List, Optional
@@ -18,156 +14,41 @@ from core import __version__
logger = logging.getLogger("core.telemetry")
# Controls how frequently telemetry will be sent (percentage)
TELEMETRY_SAMPLE_RATE = 100 # 100% sampling rate (was 5%)
# Public PostHog config for anonymous telemetry
# These values are intentionally public and meant for anonymous telemetry only
# https://posthog.com/docs/product-analytics/troubleshooting#is-it-ok-for-my-api-key-to-be-exposed-and-public
PUBLIC_POSTHOG_API_KEY = "phc_eSkLnbLxsnYFaXksif1ksbrNzYlJShr35miFLDppF14"
PUBLIC_POSTHOG_HOST = "https://eu.i.posthog.com"
@dataclass
class TelemetryConfig:
"""Configuration for telemetry collection."""
enabled: bool = True # Default to enabled (opt-out)
sample_rate: float = TELEMETRY_SAMPLE_RATE
@classmethod
def from_env(cls) -> TelemetryConfig:
"""Load config from environment variables."""
# Check for multiple environment variables that can disable telemetry:
# CUA_TELEMETRY=off to disable telemetry (legacy way)
# CUA_TELEMETRY_DISABLED=1 to disable telemetry (new, more explicit way)
telemetry_disabled = os.environ.get("CUA_TELEMETRY", "").lower() == "off" or os.environ.get(
"CUA_TELEMETRY_DISABLED", ""
).lower() in ("1", "true", "yes", "on")
return cls(
enabled=not telemetry_disabled,
sample_rate=float(os.environ.get("CUA_TELEMETRY_SAMPLE_RATE", TELEMETRY_SAMPLE_RATE)),
)
def to_dict(self) -> Dict[str, Any]:
"""Convert config to dictionary."""
return {
"enabled": self.enabled,
"sample_rate": self.sample_rate,
}
def get_posthog_config() -> dict:
"""Get PostHog configuration for anonymous telemetry.
Uses the public API key that's specifically intended for anonymous telemetry collection.
No private keys are used or required from users.
Returns:
Dict with PostHog configuration
"""
# Return the public config
logger.debug("Using public PostHog configuration")
return {"api_key": PUBLIC_POSTHOG_API_KEY, "host": PUBLIC_POSTHOG_HOST}
class PostHogTelemetryClient:
"""Collects and reports telemetry data via PostHog."""
# Global singleton (class-managed)
_singleton: Optional["PostHogTelemetryClient"] = None
def __init__(self):
"""Initialize PostHog telemetry client."""
self.config = TelemetryConfig.from_env()
self.installation_id = self._get_or_create_installation_id()
self.initialized = False
self.queued_events: List[Dict[str, Any]] = []
self.start_time = time.time()
# Log telemetry status on startup
if self.config.enabled:
logger.info(f"Telemetry enabled (sampling at {self.config.sample_rate}%)")
if self.is_telemetry_enabled():
logger.info("Telemetry enabled")
# Initialize PostHog client if config is available
self._initialize_posthog()
else:
logger.info("Telemetry disabled")
def _initialize_posthog(self) -> bool:
"""Initialize the PostHog client with configuration.
Returns:
bool: True if initialized successfully, False otherwise
"""
if self.initialized:
return True
posthog_config = get_posthog_config()
try:
# Initialize the PostHog client
posthog.api_key = posthog_config["api_key"]
posthog.host = posthog_config["host"]
# Configure the client
posthog.debug = os.environ.get("CUA_TELEMETRY_DEBUG", "").lower() == "on"
posthog.disabled = not self.config.enabled
# Log telemetry status
if not posthog.disabled:
logger.info(
f"Initializing PostHog telemetry with installation ID: {self.installation_id}"
)
if posthog.debug:
logger.debug(f"PostHog API Key: {posthog.api_key}")
logger.debug(f"PostHog Host: {posthog.host}")
else:
logger.info("PostHog telemetry is disabled")
# Identify this installation
self._identify()
# Process any queued events
for event in self.queued_events:
posthog.capture(
distinct_id=self.installation_id,
event=event["event"],
properties=event["properties"],
)
self.queued_events = []
self.initialized = True
return True
except Exception as e:
logger.warning(f"Failed to initialize PostHog: {e}")
return False
def _identify(self) -> None:
"""Set up user properties for the current installation with PostHog.
Note: The Python PostHog SDK doesn't have an identify() method like the web SDK.
Instead, we capture an identification event with user properties.
"""
try:
properties = {
"version": __version__,
"is_ci": "CI" in os.environ,
"os": os.name,
"python_version": sys.version.split()[0],
}
logger.debug(
f"Setting up PostHog user properties for: {self.installation_id} with properties: {properties}"
)
# In the Python SDK, we capture an identification event instead of calling identify()
posthog.capture(
distinct_id=self.installation_id,
event="$identify",
properties={"$set": properties}
)
logger.info(f"Set up PostHog user properties for installation: {self.installation_id}")
except Exception as e:
logger.warning(f"Failed to set up PostHog user properties: {e}")
@classmethod
def is_telemetry_enabled(cls) -> bool:
"""True if telemetry is currently active for this process."""
return (
# Legacy opt-out flag
os.environ.get("CUA_TELEMETRY", "").lower() != "off"
# Opt-in flag (defaults to enabled)
and os.environ.get("CUA_TELEMETRY_ENABLED", "true").lower() in { "1", "true", "yes", "on" }
)
def _get_or_create_installation_id(self) -> str:
"""Get or create a unique installation ID that persists across runs.
@@ -213,40 +94,73 @@ class PostHogTelemetryClient:
logger.warning("Using random installation ID (will not persist across runs)")
return str(uuid.uuid4())
def increment(self, counter_name: str, value: int = 1) -> None:
"""Increment a named counter.
def _initialize_posthog(self) -> bool:
"""Initialize the PostHog client with configuration.
Args:
counter_name: Name of the counter
value: Amount to increment by (default: 1)
Returns:
bool: True if initialized successfully, False otherwise
"""
if not self.config.enabled:
return
# Apply sampling to reduce number of events
if random.random() * 100 > self.config.sample_rate:
return
properties = {
"value": value,
"counter_name": counter_name,
"version": __version__,
}
if self.initialized:
try:
return True
try:
# Allow overrides from environment for testing/region control
posthog.api_key = PUBLIC_POSTHOG_API_KEY
posthog.host = PUBLIC_POSTHOG_HOST
# Configure the client
posthog.debug = os.environ.get("CUA_TELEMETRY_DEBUG", "").lower() == "on"
# Log telemetry status
logger.info(
f"Initializing PostHog telemetry with installation ID: {self.installation_id}"
)
if posthog.debug:
logger.debug(f"PostHog API Key: {posthog.api_key}")
logger.debug(f"PostHog Host: {posthog.host}")
# Identify this installation
self._identify()
# Process any queued events
for event in self.queued_events:
posthog.capture(
distinct_id=self.installation_id,
event="counter_increment",
properties=properties,
event=event["event"],
properties=event["properties"],
)
except Exception as e:
logger.debug(f"Failed to send counter event to PostHog: {e}")
else:
# Queue the event for later
self.queued_events.append({"event": "counter_increment", "properties": properties})
# Try to initialize now if not already
self._initialize_posthog()
self.queued_events = []
self.initialized = True
return True
except Exception as e:
logger.warning(f"Failed to initialize PostHog: {e}")
return False
def _identify(self) -> None:
"""Set up user properties for the current installation with PostHog."""
try:
properties = {
"version": __version__,
"is_ci": "CI" in os.environ,
"os": os.name,
"python_version": sys.version.split()[0],
}
logger.debug(
f"Setting up PostHog user properties for: {self.installation_id} with properties: {properties}"
)
# In the Python SDK, we capture an identification event instead of calling identify()
posthog.capture(
distinct_id=self.installation_id,
event="$identify",
properties={"$set": properties}
)
logger.info(f"Set up PostHog user properties for installation: {self.installation_id}")
except Exception as e:
logger.warning(f"Failed to set up PostHog user properties: {e}")
def record_event(self, event_name: str, properties: Optional[Dict[str, Any]] = None) -> None:
"""Record an event with optional properties.
@@ -255,15 +169,9 @@ class PostHogTelemetryClient:
event_name: Name of the event
properties: Event properties (must not contain sensitive data)
"""
if not self.config.enabled:
logger.debug(f"Telemetry disabled, skipping event: {event_name}")
return
# Apply sampling to reduce number of events
if random.random() * 100 > self.config.sample_rate:
logger.debug(
f"Event sampled out due to sampling rate {self.config.sample_rate}%: {event_name}"
)
# Respect runtime telemetry opt-out.
if not self.is_telemetry_enabled():
logger.debug("Telemetry disabled; event not recorded.")
return
event_properties = {"version": __version__, **(properties or {})}
@@ -294,9 +202,6 @@ class PostHogTelemetryClient:
Returns:
bool: True if successful, False otherwise
"""
if not self.config.enabled:
return False
if not self.initialized and not self._initialize_posthog():
return False
@@ -307,41 +212,25 @@ class PostHogTelemetryClient:
logger.debug(f"Failed to flush PostHog events: {e}")
return False
def enable(self) -> None:
"""Enable telemetry collection."""
self.config.enabled = True
if posthog:
posthog.disabled = False
logger.info("Telemetry enabled")
self._initialize_posthog()
@classmethod
def get_client(cls) -> "PostHogTelemetryClient":
"""Return the global PostHogTelemetryClient instance, creating it if needed."""
if cls._singleton is None:
cls._singleton = cls()
return cls._singleton
def disable(self) -> None:
"""Disable telemetry collection."""
self.config.enabled = False
if posthog:
posthog.disabled = True
logger.info("Telemetry disabled")
@classmethod
def destroy_client(cls) -> None:
"""Destroy the global PostHogTelemetryClient instance."""
cls._singleton = None
def destroy_telemetry_client() -> None:
"""Destroy the global PostHogTelemetryClient instance (class-managed)."""
PostHogTelemetryClient.destroy_client()
# Global telemetry client instance
_client: Optional[PostHogTelemetryClient] = None
def is_telemetry_enabled() -> bool:
return PostHogTelemetryClient.is_telemetry_enabled()
def get_posthog_telemetry_client() -> PostHogTelemetryClient:
"""Get or initialize the global PostHog telemetry client.
Returns:
The global telemetry client instance
"""
global _client
if _client is None:
_client = PostHogTelemetryClient()
return _client
def disable_telemetry() -> None:
"""Disable telemetry collection globally."""
if _client is not None:
_client.disable()
def record_event(event_name: str, properties: Optional[Dict[str, Any]] | None = None) -> None:
"""Record an arbitrary PostHog event."""
PostHogTelemetryClient.get_client().record_event(event_name, properties or {})

View File

@@ -1,24 +0,0 @@
"""Telemetry sender module for sending anonymous usage data."""
import logging
from typing import Any, Dict
logger = logging.getLogger("core.telemetry")
def send_telemetry(payload: Dict[str, Any]) -> bool:
"""Send telemetry data to collection endpoint.
Args:
payload: Telemetry data to send
Returns:
bool: True if sending was successful, False otherwise
"""
try:
# For now, just log the payload and return success
logger.debug(f"Would send telemetry: {payload}")
return True
except Exception as e:
logger.debug(f"Error sending telemetry: {e}")
return False

View File

@@ -1,310 +0,0 @@
"""Universal telemetry module for collecting anonymous usage data.
This module provides a unified interface for telemetry collection,
using PostHog as the backend.
"""
from __future__ import annotations
import logging
import os
import sys
from enum import Enum
from pathlib import Path
from typing import Any, Dict, Optional, Union
# Configure telemetry logging before importing anything else
# By default, set telemetry loggers to WARNING level to hide INFO messages
# This can be overridden with CUA_TELEMETRY_LOG_LEVEL environment variable
def _configure_telemetry_logging() -> None:
"""Set up initial logging configuration for telemetry."""
# Determine log level from environment variable or use WARNING by default
env_level = os.environ.get("CUA_TELEMETRY_LOG_LEVEL", "WARNING").upper()
level = logging.WARNING # Default to WARNING to hide INFO messages
if env_level == "DEBUG":
level = logging.DEBUG
elif env_level == "INFO":
level = logging.INFO
elif env_level == "ERROR":
level = logging.ERROR
# Configure the main telemetry logger
telemetry_logger = logging.getLogger("core.telemetry")
telemetry_logger.setLevel(level)
# Configure logging immediately
_configure_telemetry_logging()
# Import telemetry backend
try:
from core.telemetry.posthog_client import (
PostHogTelemetryClient,
get_posthog_telemetry_client,
)
POSTHOG_AVAILABLE = True
except ImportError:
logger = logging.getLogger("core.telemetry")
logger.info("PostHog not available. Install with: pdm add posthog")
POSTHOG_AVAILABLE = False
logger = logging.getLogger("core.telemetry")
# Check environment variables for global telemetry opt-out
def is_telemetry_globally_disabled() -> bool:
"""Check if telemetry is globally disabled via environment variables.
Returns:
bool: True if telemetry is globally disabled, False otherwise
"""
# Only check for CUA_TELEMETRY_ENABLED - telemetry is enabled only if explicitly set to a truthy value
telemetry_enabled = os.environ.get("CUA_TELEMETRY_ENABLED", "true").lower()
return telemetry_enabled not in ("1", "true", "yes", "on")
class TelemetryBackend(str, Enum):
"""Available telemetry backend types."""
POSTHOG = "posthog"
NONE = "none"
class UniversalTelemetryClient:
"""Universal telemetry client that delegates to the PostHog backend."""
def __init__(
self,
backend: Optional[str] = None,
):
"""Initialize the universal telemetry client.
Args:
backend: Backend to use ("posthog" or "none")
If not specified, will try PostHog
"""
# Check for global opt-out first
if is_telemetry_globally_disabled():
self.backend_type = TelemetryBackend.NONE
logger.info("Telemetry globally disabled via environment variable")
# Determine which backend to use
elif backend and backend.lower() == "none":
self.backend_type = TelemetryBackend.NONE
else:
# Auto-detect based on environment variables and available backends
if POSTHOG_AVAILABLE:
self.backend_type = TelemetryBackend.POSTHOG
else:
self.backend_type = TelemetryBackend.NONE
logger.warning("PostHog is not available, telemetry will be disabled")
# Initialize the appropriate client
self._client = self._initialize_client()
self._enabled = self.backend_type != TelemetryBackend.NONE
def _initialize_client(self) -> Any:
"""Initialize the appropriate telemetry client based on the selected backend."""
if self.backend_type == TelemetryBackend.POSTHOG and POSTHOG_AVAILABLE:
logger.debug("Initializing PostHog telemetry client")
return get_posthog_telemetry_client()
else:
logger.debug("No telemetry client initialized")
return None
def increment(self, counter_name: str, value: int = 1) -> None:
"""Increment a named counter.
Args:
counter_name: Name of the counter
value: Amount to increment by (default: 1)
"""
if self._client and self._enabled:
self._client.increment(counter_name, value)
def record_event(self, event_name: str, properties: Optional[Dict[str, Any]] = None) -> None:
"""Record an event with optional properties.
Args:
event_name: Name of the event
properties: Event properties (must not contain sensitive data)
"""
if self._client and self._enabled:
self._client.record_event(event_name, properties)
def flush(self) -> bool:
"""Flush any pending events to the backend.
Returns:
bool: True if successful, False otherwise
"""
if self._client and self._enabled:
return self._client.flush()
return False
def enable(self) -> None:
"""Enable telemetry collection."""
if self._client and not is_telemetry_globally_disabled():
self._client.enable()
self._enabled = True
else:
if is_telemetry_globally_disabled():
logger.info("Cannot enable telemetry: globally disabled via environment variable")
self._enabled = False
def disable(self) -> None:
"""Disable telemetry collection."""
if self._client:
self._client.disable()
self._enabled = False
def is_enabled(self) -> bool:
"""Check if telemetry is enabled.
Returns:
bool: True if telemetry is enabled, False otherwise
"""
return self._enabled and not is_telemetry_globally_disabled()
# Global telemetry client instance
_universal_client: Optional[UniversalTelemetryClient] = None
def get_telemetry_client(
backend: Optional[str] = None,
) -> UniversalTelemetryClient:
"""Get or initialize the global telemetry client.
Args:
backend: Backend to use ("posthog" or "none")
Returns:
The global telemetry client instance
"""
global _universal_client
if _universal_client is None:
_universal_client = UniversalTelemetryClient(backend)
return _universal_client
def increment(counter_name: str, value: int = 1) -> None:
"""Increment a named counter using the global telemetry client.
Args:
counter_name: Name of the counter
value: Amount to increment by (default: 1)
"""
client = get_telemetry_client()
client.increment(counter_name, value)
def record_event(event_name: str, properties: Optional[Dict[str, Any]] = None) -> None:
"""Record an event with optional properties using the global telemetry client.
Args:
event_name: Name of the event
properties: Event properties (must not contain sensitive data)
"""
client = get_telemetry_client()
client.record_event(event_name, properties)
def flush() -> bool:
"""Flush any pending events using the global telemetry client.
Returns:
bool: True if successful, False otherwise
"""
client = get_telemetry_client()
return client.flush()
def enable_telemetry() -> bool:
"""Enable telemetry collection globally.
Returns:
bool: True if successfully enabled, False if globally disabled
"""
if is_telemetry_globally_disabled():
logger.info("Cannot enable telemetry: globally disabled via environment variable")
return False
client = get_telemetry_client()
client.enable()
return True
def disable_telemetry() -> None:
"""Disable telemetry collection globally."""
client = get_telemetry_client()
client.disable()
def is_telemetry_enabled() -> bool:
"""Check if telemetry is enabled.
Returns:
bool: True if telemetry is enabled, False otherwise
"""
# First check for global disable
if is_telemetry_globally_disabled():
return False
# Get the global client and check
client = get_telemetry_client()
return client.is_enabled()
def set_telemetry_log_level(level: Optional[int] = None) -> None:
"""Set the logging level for telemetry loggers to reduce console output.
By default, checks the CUA_TELEMETRY_LOG_LEVEL environment variable:
- If set to "DEBUG", sets level to logging.DEBUG
- If set to "INFO", sets level to logging.INFO
- If set to "WARNING", sets level to logging.WARNING
- If set to "ERROR", sets level to logging.ERROR
- If not set, defaults to logging.WARNING
This means telemetry logs will only show up when explicitly requested via
the environment variable, not during normal operation.
Args:
level: The logging level to set (overrides environment variable if provided)
"""
# Determine the level from environment variable if not explicitly provided
if level is None:
env_level = os.environ.get("CUA_TELEMETRY_LOG_LEVEL", "WARNING").upper()
if env_level == "DEBUG":
level = logging.DEBUG
elif env_level == "INFO":
level = logging.INFO
elif env_level == "WARNING":
level = logging.WARNING
elif env_level == "ERROR":
level = logging.ERROR
else:
# Default to WARNING if environment variable is not recognized
level = logging.WARNING
# Set the level for all telemetry-related loggers
telemetry_loggers = [
"core.telemetry",
"agent.telemetry",
"computer.telemetry",
"posthog",
]
for logger_name in telemetry_loggers:
try:
logging.getLogger(logger_name).setLevel(level)
except Exception:
pass
# Set telemetry loggers to appropriate level based on environment variable
# This is called at module import time to ensure proper configuration before any logging happens
set_telemetry_log_level()