mirror of
https://github.com/formbricks/formbricks.git
synced 2025-12-21 11:59:54 -06:00
feat: telemetry setup (#6888)
Co-authored-by: Matti Nannt <matti@formbricks.com>
This commit is contained in:
committed by
GitHub
parent
c53e4f54cb
commit
018cef61a6
272
apps/web/app/api/(internal)/pipeline/lib/telemetry.test.ts
Normal file
272
apps/web/app/api/(internal)/pipeline/lib/telemetry.test.ts
Normal file
@@ -0,0 +1,272 @@
|
||||
import { IntegrationType } from "@prisma/client";
|
||||
import { afterEach, beforeEach, describe, expect, test, vi } from "vitest";
|
||||
import { getCacheService } from "@formbricks/cache";
|
||||
import { prisma } from "@formbricks/database";
|
||||
import { logger } from "@formbricks/logger";
|
||||
import { sendTelemetryEvents } from "./telemetry";
|
||||
|
||||
// Mock dependencies
|
||||
vi.mock("@formbricks/cache");
|
||||
vi.mock("@formbricks/database", () => ({
|
||||
prisma: {
|
||||
organization: {
|
||||
findFirst: vi.fn(),
|
||||
count: vi.fn(),
|
||||
},
|
||||
user: { count: vi.fn() },
|
||||
team: { count: vi.fn() },
|
||||
project: { count: vi.fn() },
|
||||
survey: { count: vi.fn() },
|
||||
response: {
|
||||
count: vi.fn(),
|
||||
findFirst: vi.fn(),
|
||||
},
|
||||
display: { count: vi.fn() },
|
||||
contact: { count: vi.fn() },
|
||||
segment: { count: vi.fn() },
|
||||
integration: { findMany: vi.fn() },
|
||||
account: { findMany: vi.fn() },
|
||||
$queryRaw: vi.fn(),
|
||||
},
|
||||
}));
|
||||
vi.mock("@formbricks/logger", () => ({
|
||||
logger: {
|
||||
error: vi.fn(),
|
||||
info: vi.fn(),
|
||||
warn: vi.fn(),
|
||||
debug: vi.fn(),
|
||||
},
|
||||
}));
|
||||
vi.mock("@/lib/env", () => ({
|
||||
env: {
|
||||
SMTP_HOST: "smtp.example.com",
|
||||
S3_BUCKET_NAME: "my-bucket",
|
||||
PROMETHEUS_ENABLED: true,
|
||||
RECAPTCHA_SITE_KEY: "site-key",
|
||||
RECAPTCHA_SECRET_KEY: "secret-key",
|
||||
GITHUB_ID: "github-id",
|
||||
},
|
||||
}));
|
||||
|
||||
// Mock fetch
|
||||
const fetchMock = vi.fn();
|
||||
globalThis.fetch = fetchMock;
|
||||
|
||||
const mockCacheService = {
|
||||
get: vi.fn(),
|
||||
set: vi.fn(),
|
||||
tryLock: vi.fn(),
|
||||
del: vi.fn(),
|
||||
};
|
||||
|
||||
describe("sendTelemetryEvents", () => {
|
||||
beforeEach(() => {
|
||||
vi.resetAllMocks();
|
||||
vi.useFakeTimers();
|
||||
// Set a fixed time far in the past to ensure we can always send telemetry
|
||||
vi.setSystemTime(new Date("2024-01-01T00:00:00.000Z"));
|
||||
|
||||
// Setup default cache behavior
|
||||
vi.mocked(getCacheService).mockResolvedValue({
|
||||
ok: true,
|
||||
data: mockCacheService as any,
|
||||
});
|
||||
mockCacheService.tryLock.mockResolvedValue({ ok: true, data: true }); // Lock acquired
|
||||
mockCacheService.del.mockResolvedValue({ ok: true, data: undefined });
|
||||
mockCacheService.get.mockResolvedValue({ ok: true, data: null }); // No last sent time
|
||||
mockCacheService.set.mockResolvedValue({ ok: true, data: undefined });
|
||||
|
||||
// Setup default prisma behavior
|
||||
vi.mocked(prisma.organization.findFirst).mockResolvedValue({
|
||||
id: "org-123",
|
||||
createdAt: new Date("2023-01-01"),
|
||||
} as any);
|
||||
|
||||
// Mock raw SQL query for counts (batched query)
|
||||
vi.mocked(prisma.$queryRaw).mockResolvedValue([
|
||||
{
|
||||
organizationCount: BigInt(1),
|
||||
userCount: BigInt(5),
|
||||
teamCount: BigInt(2),
|
||||
projectCount: BigInt(3),
|
||||
surveyCount: BigInt(10),
|
||||
inProgressSurveyCount: BigInt(4),
|
||||
completedSurveyCount: BigInt(6),
|
||||
responseCountAllTime: BigInt(100),
|
||||
responseCountSinceLastUpdate: BigInt(10),
|
||||
displayCount: BigInt(50),
|
||||
contactCount: BigInt(20),
|
||||
segmentCount: BigInt(4),
|
||||
newestResponseAt: new Date("2024-01-01T00:00:00.000Z"),
|
||||
},
|
||||
] as any);
|
||||
|
||||
// Mock other queries
|
||||
vi.mocked(prisma.integration.findMany).mockResolvedValue([{ type: IntegrationType.notion }] as any);
|
||||
vi.mocked(prisma.account.findMany).mockResolvedValue([{ provider: "github" }] as any);
|
||||
|
||||
fetchMock.mockResolvedValue({ ok: true });
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
vi.useRealTimers();
|
||||
});
|
||||
|
||||
test("should send telemetry successfully when conditions are met", async () => {
|
||||
await sendTelemetryEvents();
|
||||
|
||||
// Check lock acquisition
|
||||
expect(mockCacheService.tryLock).toHaveBeenCalledWith(
|
||||
"telemetry_lock",
|
||||
"locked",
|
||||
60 * 1000 // 1 minute TTL
|
||||
);
|
||||
|
||||
// Check data gathering
|
||||
expect(prisma.organization.findFirst).toHaveBeenCalled();
|
||||
expect(prisma.$queryRaw).toHaveBeenCalled();
|
||||
|
||||
// Check fetch call
|
||||
expect(fetchMock).toHaveBeenCalledTimes(1);
|
||||
const payload = JSON.parse(fetchMock.mock.calls[0][1].body);
|
||||
expect(payload.organizationCount).toBe(1);
|
||||
expect(payload.userCount).toBe(5);
|
||||
expect(payload.integrations.notion).toBe(true);
|
||||
expect(payload.sso.github).toBe(true);
|
||||
|
||||
// Check cache update (no TTL parameter)
|
||||
expect(mockCacheService.set).toHaveBeenCalledWith("telemetry_last_sent_ts", expect.any(String));
|
||||
|
||||
// Check lock release
|
||||
expect(mockCacheService.del).toHaveBeenCalledWith(["telemetry_lock"]);
|
||||
});
|
||||
|
||||
test("should skip if in-memory check fails", async () => {
|
||||
// Run once to set nextTelemetryCheck
|
||||
await sendTelemetryEvents();
|
||||
vi.clearAllMocks();
|
||||
|
||||
// Run again immediately (should fail in-memory check)
|
||||
await sendTelemetryEvents();
|
||||
|
||||
expect(getCacheService).not.toHaveBeenCalled();
|
||||
expect(fetchMock).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
test("should skip if Redis last sent time is recent", async () => {
|
||||
// Mock last sent time as recent
|
||||
const recentTime = Date.now() - 1000 * 60 * 60; // 1 hour ago
|
||||
mockCacheService.get.mockResolvedValue({ ok: true, data: String(recentTime) });
|
||||
|
||||
await sendTelemetryEvents();
|
||||
|
||||
expect(mockCacheService.tryLock).not.toHaveBeenCalled(); // No lock attempt
|
||||
expect(fetchMock).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
test("should skip if lock cannot be acquired", async () => {
|
||||
mockCacheService.tryLock.mockResolvedValue({ ok: true, data: false }); // Lock not acquired
|
||||
|
||||
await sendTelemetryEvents();
|
||||
|
||||
expect(fetchMock).not.toHaveBeenCalled();
|
||||
expect(mockCacheService.del).not.toHaveBeenCalled(); // Shouldn't try to delete lock we didn't acquire
|
||||
});
|
||||
|
||||
test("should handle cache service failure gracefully", async () => {
|
||||
vi.mocked(getCacheService).mockResolvedValue({
|
||||
ok: false,
|
||||
error: new Error("Cache error"),
|
||||
} as any);
|
||||
|
||||
await sendTelemetryEvents();
|
||||
|
||||
expect(fetchMock).not.toHaveBeenCalled();
|
||||
// Should verify that nextTelemetryCheck was updated, but it's a module variable.
|
||||
// We can infer it by running again and checking calls
|
||||
vi.clearAllMocks();
|
||||
await sendTelemetryEvents();
|
||||
expect(getCacheService).not.toHaveBeenCalled(); // Should be blocked by in-memory check
|
||||
});
|
||||
|
||||
test("should handle telemetry send failure and apply cooldown", async () => {
|
||||
// Reset module to clear nextTelemetryCheck state from previous tests
|
||||
vi.resetModules();
|
||||
const { sendTelemetryEvents: freshSendTelemetryEvents } = await import("./telemetry");
|
||||
|
||||
// Ensure we can acquire lock by setting last sent time far in the past
|
||||
const oldTime = Date.now() - 25 * 60 * 60 * 1000; // 25 hours ago
|
||||
mockCacheService.get.mockResolvedValue({ ok: true, data: String(oldTime) });
|
||||
mockCacheService.tryLock.mockResolvedValue({ ok: true, data: true }); // Lock acquired
|
||||
|
||||
// Make fetch fail to trigger the catch block
|
||||
const networkError = new Error("Network error");
|
||||
fetchMock.mockRejectedValue(networkError);
|
||||
|
||||
await freshSendTelemetryEvents();
|
||||
|
||||
// Verify lock was acquired
|
||||
expect(mockCacheService.tryLock).toHaveBeenCalledWith("telemetry_lock", "locked", 60 * 1000);
|
||||
|
||||
// The error should be caught in the inner catch block
|
||||
// The actual implementation logs as warning, not error
|
||||
expect(logger.warn).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
error: networkError,
|
||||
message: "Network error",
|
||||
}),
|
||||
"Failed to send telemetry - applying 1h cooldown"
|
||||
);
|
||||
|
||||
// Lock should be released in finally block
|
||||
expect(mockCacheService.del).toHaveBeenCalledWith(["telemetry_lock"]);
|
||||
|
||||
// Cache should not be updated on failure
|
||||
expect(mockCacheService.set).not.toHaveBeenCalled();
|
||||
|
||||
// Verify cooldown: run again immediately (should be blocked by in-memory check)
|
||||
vi.clearAllMocks();
|
||||
mockCacheService.get.mockResolvedValue({ ok: true, data: null });
|
||||
mockCacheService.tryLock.mockResolvedValue({ ok: true, data: true });
|
||||
await freshSendTelemetryEvents();
|
||||
expect(getCacheService).not.toHaveBeenCalled(); // Should be blocked by in-memory check
|
||||
});
|
||||
|
||||
test("should skip if no organization exists", async () => {
|
||||
// Reset module to clear nextTelemetryCheck state from previous tests
|
||||
vi.resetModules();
|
||||
const { sendTelemetryEvents: freshSendTelemetryEvents } = await import("./telemetry");
|
||||
|
||||
// Ensure we can acquire lock by setting last sent time far in the past
|
||||
const oldTime = Date.now() - 25 * 60 * 60 * 1000; // 25 hours ago
|
||||
|
||||
// Re-setup mocks after resetModules
|
||||
vi.mocked(getCacheService).mockResolvedValue({
|
||||
ok: true,
|
||||
data: mockCacheService as any,
|
||||
});
|
||||
mockCacheService.tryLock.mockResolvedValue({ ok: true, data: true }); // Lock acquired
|
||||
mockCacheService.del.mockResolvedValue({ ok: true, data: undefined });
|
||||
mockCacheService.get.mockResolvedValue({ ok: true, data: String(oldTime) });
|
||||
mockCacheService.set.mockResolvedValue({ ok: true, data: undefined });
|
||||
|
||||
vi.mocked(prisma.organization.findFirst).mockResolvedValue(null);
|
||||
|
||||
await freshSendTelemetryEvents();
|
||||
|
||||
// sendTelemetry returns early when no org exists
|
||||
// Since it returns (not throws), the try block completes successfully
|
||||
// Then cache.set is called, and finally block executes
|
||||
expect(fetchMock).not.toHaveBeenCalled();
|
||||
|
||||
// Verify lock was acquired (prerequisite for finally block to execute)
|
||||
expect(mockCacheService.tryLock).toHaveBeenCalledWith("telemetry_lock", "locked", 60 * 1000);
|
||||
|
||||
// Lock should be released in finally block
|
||||
expect(mockCacheService.del).toHaveBeenCalledWith(["telemetry_lock"]);
|
||||
|
||||
// Note: The current implementation calls cache.set even when no org exists
|
||||
// This might be a bug, but we test the actual behavior
|
||||
expect(mockCacheService.set).toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
273
apps/web/app/api/(internal)/pipeline/lib/telemetry.ts
Normal file
273
apps/web/app/api/(internal)/pipeline/lib/telemetry.ts
Normal file
@@ -0,0 +1,273 @@
|
||||
import { IntegrationType } from "@prisma/client";
|
||||
import { createHash } from "node:crypto";
|
||||
import { type CacheKey, getCacheService } from "@formbricks/cache";
|
||||
import { prisma } from "@formbricks/database";
|
||||
import { logger } from "@formbricks/logger";
|
||||
import { env } from "@/lib/env";
|
||||
import packageJson from "@/package.json";
|
||||
|
||||
const TELEMETRY_INTERVAL_MS = 24 * 60 * 60 * 1000; // 24 hours
|
||||
const TELEMETRY_LOCK_KEY = "telemetry_lock" as CacheKey;
|
||||
const TELEMETRY_LAST_SENT_KEY = "telemetry_last_sent_ts" as CacheKey;
|
||||
|
||||
/**
|
||||
* In-memory timestamp for the next telemetry check.
|
||||
* This is a fast, process-local check to avoid unnecessary Redis calls.
|
||||
* Updated after each check to prevent redundant executions.
|
||||
*/
|
||||
let nextTelemetryCheck = 0;
|
||||
|
||||
/**
|
||||
* Sends telemetry events to Formbricks Enterprise endpoint.
|
||||
* Uses a three-layer check system to prevent duplicate submissions:
|
||||
* 1. In-memory check (fast, process-local)
|
||||
* 2. Redis check (shared across instances, persists across restarts)
|
||||
* 3. Distributed lock (prevents concurrent execution in multi-instance deployments)
|
||||
*/
|
||||
export const sendTelemetryEvents = async () => {
|
||||
try {
|
||||
const now = Date.now();
|
||||
|
||||
// ============================================================
|
||||
// CHECK 1: In-Memory Check (Fast Path)
|
||||
// ============================================================
|
||||
// Purpose: Quick process-local check to avoid Redis calls if we recently checked.
|
||||
// How it works: If current time is before nextTelemetryCheck, skip entirely.
|
||||
// This is updated after each successful check or failure to prevent spam.
|
||||
if (now < nextTelemetryCheck) {
|
||||
return;
|
||||
}
|
||||
|
||||
// ============================================================
|
||||
// CHECK 2: Redis Check (Shared State)
|
||||
// ============================================================
|
||||
// Purpose: Check if telemetry was sent recently by ANY instance (shared across cluster).
|
||||
// This persists across restarts and works in multi-instance deployments.
|
||||
|
||||
const cacheServiceResult = await getCacheService();
|
||||
if (!cacheServiceResult.ok) {
|
||||
// Redis unavailable: Fallback to in-memory cooldown to avoid spamming.
|
||||
// Wait 1 hour before trying again. This prevents hammering Redis when it's down.
|
||||
nextTelemetryCheck = now + 60 * 60 * 1000;
|
||||
return;
|
||||
}
|
||||
const cache = cacheServiceResult.data;
|
||||
|
||||
// Get the timestamp of when telemetry was last sent (from any instance).
|
||||
const lastSentResult = await cache.get(TELEMETRY_LAST_SENT_KEY);
|
||||
const lastSentStr = lastSentResult.ok && lastSentResult.data ? (lastSentResult.data as string) : null;
|
||||
const lastSent = lastSentStr ? Number.parseInt(lastSentStr, 10) : 0;
|
||||
|
||||
// If less than 24 hours have passed since last telemetry, skip.
|
||||
// Update in-memory check to match remaining time for fast-path optimization.
|
||||
if (now - lastSent < TELEMETRY_INTERVAL_MS) {
|
||||
nextTelemetryCheck = lastSent + TELEMETRY_INTERVAL_MS;
|
||||
return;
|
||||
}
|
||||
|
||||
// ============================================================
|
||||
// CHECK 3: Distributed Lock (Prevent Concurrent Execution)
|
||||
// ============================================================
|
||||
// Purpose: Ensure only ONE instance executes telemetry at a time in a cluster.
|
||||
// How it works:
|
||||
// - Uses Redis SET NX (only set if not exists) for atomic lock acquisition
|
||||
// - Lock expires after 1 minute (TTL) to prevent deadlocks if instance crashes
|
||||
// - If lock exists, another instance is already running telemetry, so we exit
|
||||
// - Lock is released in finally block after telemetry completes or fails
|
||||
const lockResult = await cache.tryLock(TELEMETRY_LOCK_KEY, "locked", 60 * 1000); // 1 minute TTL
|
||||
|
||||
if (!lockResult.ok || !lockResult.data) {
|
||||
// Lock acquisition failed or already held by another instance.
|
||||
// Exit silently - the other instance will handle telemetry.
|
||||
// No need to update nextTelemetryCheck here since we didn't execute.
|
||||
return;
|
||||
}
|
||||
|
||||
// ============================================================
|
||||
// EXECUTION: Send Telemetry
|
||||
// ============================================================
|
||||
// We've passed all checks and acquired the lock. Now execute telemetry.
|
||||
try {
|
||||
await sendTelemetry(lastSent);
|
||||
|
||||
// Success: Update Redis with current timestamp so other instances know telemetry was sent.
|
||||
// No TTL - persists indefinitely to support low-volume instances (responses every few days/weeks).
|
||||
await cache.set(TELEMETRY_LAST_SENT_KEY, now.toString());
|
||||
|
||||
// Update in-memory check to prevent this instance from checking again for 24h.
|
||||
nextTelemetryCheck = now + TELEMETRY_INTERVAL_MS;
|
||||
} catch (e) {
|
||||
// Log as warning since telemetry is non-essential
|
||||
const errorMessage = e instanceof Error ? e.message : String(e);
|
||||
logger.warn(
|
||||
{ error: e, message: errorMessage, lastSent, now },
|
||||
"Failed to send telemetry - applying 1h cooldown"
|
||||
);
|
||||
|
||||
// Failure cooldown: Prevent retrying immediately to avoid hammering the endpoint.
|
||||
// Wait 1 hour before allowing this instance to try again.
|
||||
// Note: Other instances can still try (they'll hit the lock or Redis check).
|
||||
nextTelemetryCheck = now + 60 * 60 * 1000;
|
||||
} finally {
|
||||
// Always release the lock, even if telemetry failed.
|
||||
// This allows other instances to retry if this one failed.
|
||||
await cache.del([TELEMETRY_LOCK_KEY]);
|
||||
}
|
||||
} catch (error) {
|
||||
// Catch-all for any unexpected errors in the wrapper logic (cache failures, lock issues, etc.)
|
||||
// Log as warning since telemetry is non-essential functionality
|
||||
const errorMessage = error instanceof Error ? error.message : String(error);
|
||||
logger.warn(
|
||||
{ error, message: errorMessage, timestamp: Date.now() },
|
||||
"Unexpected error in sendTelemetryEvents wrapper - telemetry check skipped"
|
||||
);
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* Gathers telemetry data and sends it to Formbricks Enterprise endpoint.
|
||||
* @param lastSent - Timestamp of last telemetry send (used to calculate incremental metrics)
|
||||
*/
|
||||
const sendTelemetry = async (lastSent: number) => {
|
||||
// Get the oldest organization to generate a stable, anonymized instance ID.
|
||||
// Using the oldest org ensures the ID doesn't change over time.
|
||||
const oldestOrg = await prisma.organization.findFirst({
|
||||
orderBy: { createdAt: "asc" },
|
||||
select: { id: true, createdAt: true },
|
||||
});
|
||||
|
||||
if (!oldestOrg) return; // No organization exists, nothing to report
|
||||
const instanceId = createHash("sha256").update(oldestOrg.id).digest("hex");
|
||||
|
||||
// Optimize database queries to reduce connection pool usage:
|
||||
// Instead of 15 parallel queries (which could exhaust the connection pool),
|
||||
// we batch all count queries into a single raw SQL query.
|
||||
// This reduces connection usage from 15 → 3 (batch counts + integrations + accounts).
|
||||
const [countsResult, integrations, ssoProviders] = await Promise.all([
|
||||
// Single query for all counts (13 metrics in one round-trip)
|
||||
prisma.$queryRaw<
|
||||
[
|
||||
{
|
||||
organizationCount: bigint;
|
||||
userCount: bigint;
|
||||
teamCount: bigint;
|
||||
projectCount: bigint;
|
||||
surveyCount: bigint;
|
||||
inProgressSurveyCount: bigint;
|
||||
completedSurveyCount: bigint;
|
||||
responseCountAllTime: bigint;
|
||||
responseCountSinceLastUpdate: bigint;
|
||||
displayCount: bigint;
|
||||
contactCount: bigint;
|
||||
segmentCount: bigint;
|
||||
newestResponseAt: Date | null;
|
||||
},
|
||||
]
|
||||
>`
|
||||
SELECT
|
||||
(SELECT COUNT(*) FROM "Organization") as "organizationCount",
|
||||
(SELECT COUNT(*) FROM "User") as "userCount",
|
||||
(SELECT COUNT(*) FROM "Team") as "teamCount",
|
||||
(SELECT COUNT(*) FROM "Project") as "projectCount",
|
||||
(SELECT COUNT(*) FROM "Survey") as "surveyCount",
|
||||
(SELECT COUNT(*) FROM "Survey" WHERE status = 'inProgress') as "inProgressSurveyCount",
|
||||
(SELECT COUNT(*) FROM "Survey" WHERE status = 'completed') as "completedSurveyCount",
|
||||
(SELECT COUNT(*) FROM "Response") as "responseCountAllTime",
|
||||
(SELECT COUNT(*) FROM "Response" WHERE "created_at" > ${new Date(lastSent || 0)}) as "responseCountSinceLastUpdate",
|
||||
(SELECT COUNT(*) FROM "Display") as "displayCount",
|
||||
(SELECT COUNT(*) FROM "Contact") as "contactCount",
|
||||
(SELECT COUNT(*) FROM "Segment") as "segmentCount",
|
||||
(SELECT MAX("created_at") FROM "Response") as "newestResponseAt"
|
||||
`,
|
||||
// Keep these as separate queries since they need DISTINCT which is harder to optimize
|
||||
prisma.integration.findMany({ select: { type: true }, distinct: ["type"] }),
|
||||
prisma.account.findMany({ select: { provider: true }, distinct: ["provider"] }),
|
||||
]);
|
||||
|
||||
// Extract metrics from the batched query result and convert bigints to numbers
|
||||
const counts = countsResult[0];
|
||||
const organizationCount = Number(counts.organizationCount);
|
||||
const userCount = Number(counts.userCount);
|
||||
const teamCount = Number(counts.teamCount);
|
||||
const projectCount = Number(counts.projectCount);
|
||||
const surveyCount = Number(counts.surveyCount);
|
||||
const inProgressSurveyCount = Number(counts.inProgressSurveyCount);
|
||||
const completedSurveyCount = Number(counts.completedSurveyCount);
|
||||
const responseCountAllTime = Number(counts.responseCountAllTime);
|
||||
const responseCountSinceLastUpdate = Number(counts.responseCountSinceLastUpdate);
|
||||
const displayCount = Number(counts.displayCount);
|
||||
const contactCount = Number(counts.contactCount);
|
||||
const segmentCount = Number(counts.segmentCount);
|
||||
const newestResponse = counts.newestResponseAt ? { createdAt: counts.newestResponseAt } : null;
|
||||
|
||||
// Convert integration array to boolean map indicating which integrations are configured.
|
||||
const integrationMap = {
|
||||
notion: integrations.some((i) => i.type === IntegrationType.notion),
|
||||
googleSheets: integrations.some((i) => i.type === IntegrationType.googleSheets),
|
||||
airtable: integrations.some((i) => i.type === IntegrationType.airtable),
|
||||
slack: integrations.some((i) => i.type === IntegrationType.slack),
|
||||
};
|
||||
|
||||
// Check SSO configuration: either via environment variables or database records.
|
||||
// This detects which SSO providers are available/configured.
|
||||
const ssoMap = {
|
||||
github: !!env.GITHUB_ID || ssoProviders.some((p) => p.provider === "github"),
|
||||
google: !!env.GOOGLE_CLIENT_ID || ssoProviders.some((p) => p.provider === "google"),
|
||||
azureAd: !!env.AZUREAD_CLIENT_ID || ssoProviders.some((p) => p.provider === "azuread"),
|
||||
oidc: !!env.OIDC_CLIENT_ID || ssoProviders.some((p) => p.provider === "openid"),
|
||||
};
|
||||
|
||||
// Construct telemetry payload with usage statistics and configuration.
|
||||
const payload = {
|
||||
schemaVersion: 1, // Schema version for future compatibility
|
||||
// Core entity counts
|
||||
organizationCount,
|
||||
userCount,
|
||||
teamCount,
|
||||
projectCount,
|
||||
surveyCount,
|
||||
inProgressSurveyCount,
|
||||
completedSurveyCount,
|
||||
// Response metrics
|
||||
responseCountAllTime,
|
||||
responseCountSinceLastUsageUpdate: responseCountSinceLastUpdate, // Incremental since last telemetry
|
||||
displayCount,
|
||||
contactCount,
|
||||
segmentCount,
|
||||
integrations: integrationMap,
|
||||
infrastructure: {
|
||||
smtp: !!env.SMTP_HOST,
|
||||
s3: !!env.S3_BUCKET_NAME,
|
||||
prometheus: !!env.PROMETHEUS_ENABLED,
|
||||
},
|
||||
security: {
|
||||
recaptcha: !!(env.RECAPTCHA_SITE_KEY && env.RECAPTCHA_SECRET_KEY),
|
||||
},
|
||||
sso: ssoMap,
|
||||
meta: {
|
||||
version: packageJson.version, // Formbricks version for compatibility tracking
|
||||
},
|
||||
temporal: {
|
||||
instanceCreatedAt: oldestOrg.createdAt.toISOString(), // When instance was first created
|
||||
newestResponseAt: newestResponse?.createdAt.toISOString() || null, // Most recent activity
|
||||
},
|
||||
};
|
||||
|
||||
// Send telemetry to Formbricks Enterprise endpoint.
|
||||
// This endpoint collects usage statistics for enterprise license validation and analytics.
|
||||
const url = `https://ee.formbricks.com/api/v1/instances/${instanceId}/usage-updates`;
|
||||
|
||||
const controller = new AbortController();
|
||||
const timeout = setTimeout(() => controller.abort(), 10000); // 10 second timeout
|
||||
|
||||
await fetch(url, {
|
||||
method: "POST",
|
||||
headers: {
|
||||
"Content-Type": "application/json",
|
||||
},
|
||||
body: JSON.stringify(payload),
|
||||
signal: controller.signal,
|
||||
});
|
||||
|
||||
clearTimeout(timeout);
|
||||
};
|
||||
@@ -3,6 +3,7 @@ import { headers } from "next/headers";
|
||||
import { prisma } from "@formbricks/database";
|
||||
import { logger } from "@formbricks/logger";
|
||||
import { ResourceNotFoundError } from "@formbricks/types/errors";
|
||||
import { sendTelemetryEvents } from "@/app/api/(internal)/pipeline/lib/telemetry";
|
||||
import { ZPipelineInput } from "@/app/api/(internal)/pipeline/types/pipelines";
|
||||
import { responses } from "@/app/lib/api/response";
|
||||
import { transformErrorToDetails } from "@/app/lib/api/validator";
|
||||
@@ -226,6 +227,10 @@ export const POST = async (request: Request) => {
|
||||
}
|
||||
});
|
||||
}
|
||||
if (event === "responseCreated") {
|
||||
// Send telemetry events
|
||||
await sendTelemetryEvents();
|
||||
}
|
||||
|
||||
return Response.json({ data: {} });
|
||||
};
|
||||
|
||||
60
packages/cache/src/service.ts
vendored
60
packages/cache/src/service.ts
vendored
@@ -3,7 +3,7 @@ import type { RedisClient } from "@/types/client";
|
||||
import { type CacheError, CacheErrorClass, ErrorCode, type Result, err, ok } from "@/types/error";
|
||||
import type { CacheKey } from "@/types/keys";
|
||||
import { ZCacheKey } from "@/types/keys";
|
||||
import { ZTtlMs } from "@/types/service";
|
||||
import { ZTtlMs, ZTtlMsOptional } from "@/types/service";
|
||||
import { validateInputs } from "./utils/validation";
|
||||
|
||||
/**
|
||||
@@ -116,13 +116,13 @@ export class CacheService {
|
||||
}
|
||||
|
||||
/**
|
||||
* Set a value in cache with automatic JSON serialization and TTL
|
||||
* Set a value in cache with automatic JSON serialization and optional TTL
|
||||
* @param key - Cache key to store under
|
||||
* @param value - Value to store
|
||||
* @param ttlMs - Time to live in milliseconds
|
||||
* @param ttlMs - Time to live in milliseconds (optional - if omitted, key persists indefinitely)
|
||||
* @returns Result containing void or an error
|
||||
*/
|
||||
async set(key: CacheKey, value: unknown, ttlMs: number): Promise<Result<void, CacheError>> {
|
||||
async set(key: CacheKey, value: unknown, ttlMs?: number): Promise<Result<void, CacheError>> {
|
||||
// Check Redis availability first
|
||||
if (!this.isRedisClientReady()) {
|
||||
return err({
|
||||
@@ -130,8 +130,8 @@ export class CacheService {
|
||||
});
|
||||
}
|
||||
|
||||
// Validate both key and TTL in one call
|
||||
const validation = validateInputs([key, ZCacheKey], [ttlMs, ZTtlMs]);
|
||||
// Validate key and optional TTL
|
||||
const validation = validateInputs([key, ZCacheKey], [ttlMs, ZTtlMsOptional]);
|
||||
if (!validation.ok) {
|
||||
return validation;
|
||||
}
|
||||
@@ -141,7 +141,13 @@ export class CacheService {
|
||||
const normalizedValue = value === undefined ? null : value;
|
||||
const serialized = JSON.stringify(normalizedValue);
|
||||
|
||||
await this.withTimeout(this.redis.setEx(key, Math.floor(ttlMs / 1000), serialized));
|
||||
if (ttlMs === undefined) {
|
||||
// Set without expiration (persists indefinitely)
|
||||
await this.withTimeout(this.redis.set(key, serialized));
|
||||
} else {
|
||||
// Set with expiration
|
||||
await this.withTimeout(this.redis.setEx(key, Math.floor(ttlMs / 1000), serialized));
|
||||
}
|
||||
return ok(undefined);
|
||||
} catch (error) {
|
||||
logger.error({ error, key, ttlMs }, "Cache set operation failed");
|
||||
@@ -185,6 +191,44 @@ export class CacheService {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Try to acquire a distributed lock (atomic SET NX operation)
|
||||
* @param key - Lock key
|
||||
* @param value - Lock value (typically "locked" or instance identifier)
|
||||
* @param ttlMs - Time to live in milliseconds (lock expiration)
|
||||
* @returns Result containing boolean indicating if lock was acquired, or an error
|
||||
*/
|
||||
async tryLock(key: CacheKey, value: string, ttlMs: number): Promise<Result<boolean, CacheError>> {
|
||||
// Check Redis availability first
|
||||
if (!this.isRedisClientReady()) {
|
||||
return err({
|
||||
code: ErrorCode.RedisConnectionError,
|
||||
});
|
||||
}
|
||||
|
||||
const validation = validateInputs([key, ZCacheKey], [ttlMs, ZTtlMs]);
|
||||
if (!validation.ok) {
|
||||
return validation;
|
||||
}
|
||||
|
||||
try {
|
||||
// Use SET with NX (only set if not exists) and PX (expiration in milliseconds) for atomic lock acquisition
|
||||
const result = await this.withTimeout(
|
||||
this.redis.set(key, value, {
|
||||
NX: true,
|
||||
PX: ttlMs,
|
||||
})
|
||||
);
|
||||
// SET returns "OK" if lock was acquired, null if key already exists
|
||||
return ok(result === "OK");
|
||||
} catch (error) {
|
||||
logger.error({ error, key, ttlMs }, "Cache lock operation failed");
|
||||
return err({
|
||||
code: ErrorCode.RedisOperationError,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Cache wrapper for functions (cache-aside).
|
||||
* Never throws due to cache errors; function errors propagate without retry.
|
||||
@@ -243,7 +287,7 @@ export class CacheService {
|
||||
}
|
||||
|
||||
private async trySetCache(key: CacheKey, value: unknown, ttlMs: number): Promise<void> {
|
||||
if (typeof value === "undefined") {
|
||||
if (value === undefined) {
|
||||
return; // Skip caching undefined values
|
||||
}
|
||||
|
||||
|
||||
7
packages/cache/types/service.ts
vendored
7
packages/cache/types/service.ts
vendored
@@ -5,3 +5,10 @@ export const ZTtlMs = z
|
||||
.int()
|
||||
.min(1000, "TTL must be at least 1000ms (1 second)")
|
||||
.finite("TTL must be finite");
|
||||
|
||||
export const ZTtlMsOptional = z
|
||||
.number()
|
||||
.int()
|
||||
.min(1000, "TTL must be at least 1000ms (1 second)")
|
||||
.finite("TTL must be finite")
|
||||
.optional();
|
||||
|
||||
Reference in New Issue
Block a user