Consolidate telemetry logic

This commit is contained in:
James Murdza
2025-08-11 10:25:37 -04:00
parent 1832f53075
commit c14895e93a

View File

@@ -20,21 +20,6 @@ logger = logging.getLogger("core.telemetry")
PUBLIC_POSTHOG_API_KEY = "phc_eSkLnbLxsnYFaXksif1ksbrNzYlJShr35miFLDppF14"
PUBLIC_POSTHOG_HOST = "https://eu.i.posthog.com"
def get_posthog_config() -> dict:
"""Get PostHog configuration for anonymous telemetry.
Uses the public API key that's specifically intended for anonymous telemetry collection.
No private keys are used or required from users.
Returns:
Dict with PostHog configuration
"""
# Return the public config
logger.debug("Using public PostHog configuration")
return {"api_key": PUBLIC_POSTHOG_API_KEY, "host": PUBLIC_POSTHOG_HOST}
class PostHogTelemetryClient:
"""Collects and reports telemetry data via PostHog."""
@@ -45,13 +30,40 @@ class PostHogTelemetryClient:
self.queued_events: List[Dict[str, Any]] = []
# Log telemetry status on startup
if not _telemetry_disabled():
if not self._telemetry_disabled():
logger.info(f"Telemetry enabled")
# Initialize PostHog client if config is available
self._initialize_posthog()
else:
logger.info("Telemetry disabled")
@staticmethod
def _telemetry_disabled() -> bool:
"""Return ``True`` when telemetry is explicitly disabled via environment variables."""
return (
os.environ.get("CUA_TELEMETRY", "").lower() == "off"
or os.environ.get("CUA_TELEMETRY_ENABLED", "true").lower()
not in {"1", "true", "yes", "on"}
)
@staticmethod
def _get_posthog_config() -> dict:
"""Return PostHog configuration for anonymous telemetry (public credentials)."""
logger.debug("Using public PostHog configuration (from class method)")
return {
"api_key": PUBLIC_POSTHOG_API_KEY,
"host": PUBLIC_POSTHOG_HOST,
}
# ------------------------------------------------------------------
# Public helpers
# ------------------------------------------------------------------
@classmethod
def is_telemetry_enabled(cls) -> bool:
"""True if telemetry is currently active for this process."""
return not cls._telemetry_disabled()
def _initialize_posthog(self) -> bool:
"""Initialize the PostHog client with configuration.
@@ -61,7 +73,7 @@ class PostHogTelemetryClient:
if self.initialized:
return True
posthog_config = get_posthog_config()
posthog_config = self._get_posthog_config()
try:
# Initialize the PostHog client
@@ -177,6 +189,11 @@ class PostHogTelemetryClient:
event_name: Name of the event
properties: Event properties (must not contain sensitive data)
"""
# Respect runtime telemetry opt-out.
if self._telemetry_disabled():
logger.debug("Telemetry disabled; event not recorded.")
return
event_properties = {"version": __version__, **(properties or {})}
logger.info(f"Recording event: {event_name} with properties: {event_properties}")
@@ -233,32 +250,14 @@ def get_posthog_telemetry_client() -> PostHogTelemetryClient:
return _client
# ---------------------------------------------------------------------------
# Lightweight wrapper functions (migrated from telemetry.py)
# ---------------------------------------------------------------------------
def _telemetry_disabled() -> bool:
return (
os.environ.get("CUA_TELEMETRY", "").lower() == "off"
or os.environ.get("CUA_TELEMETRY_ENABLED", "true").lower() not in {"1", "true", "yes", "on"}
)
def destroy_telemetry_client() -> None:
"""Destroy the global telemetry client instance."""
global _client
_client = None
def is_telemetry_enabled() -> bool:
"""Return True if telemetry is currently active."""
return not _telemetry_disabled()
return PostHogTelemetryClient.is_telemetry_enabled()
def record_event(event_name: str, properties: Optional[Dict[str, Any]] | None = None) -> None:
"""Record an arbitrary PostHog event."""
if _telemetry_disabled():
return
client = get_posthog_telemetry_client()
if client:
client.record_event(event_name, properties or {})
get_posthog_telemetry_client().record_event(event_name, properties or {})