mirror of
https://github.com/formbricks/formbricks.git
synced 2025-12-24 07:00:34 -06:00
Compare commits
7 Commits
fix/block-
...
4.3.1
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1f0042b55c | ||
|
|
eb14cc0593 | ||
|
|
b64beb83ad | ||
|
|
018cef61a6 | ||
|
|
c53e4f54cb | ||
|
|
e2fd71abfd | ||
|
|
f888aa8a19 |
272
apps/web/app/api/(internal)/pipeline/lib/telemetry.test.ts
Normal file
272
apps/web/app/api/(internal)/pipeline/lib/telemetry.test.ts
Normal file
@@ -0,0 +1,272 @@
|
||||
import { IntegrationType } from "@prisma/client";
|
||||
import { afterEach, beforeEach, describe, expect, test, vi } from "vitest";
|
||||
import { getCacheService } from "@formbricks/cache";
|
||||
import { prisma } from "@formbricks/database";
|
||||
import { logger } from "@formbricks/logger";
|
||||
import { sendTelemetryEvents } from "./telemetry";
|
||||
|
||||
// Mock dependencies
|
||||
vi.mock("@formbricks/cache");
|
||||
vi.mock("@formbricks/database", () => ({
|
||||
prisma: {
|
||||
organization: {
|
||||
findFirst: vi.fn(),
|
||||
count: vi.fn(),
|
||||
},
|
||||
user: { count: vi.fn() },
|
||||
team: { count: vi.fn() },
|
||||
project: { count: vi.fn() },
|
||||
survey: { count: vi.fn() },
|
||||
response: {
|
||||
count: vi.fn(),
|
||||
findFirst: vi.fn(),
|
||||
},
|
||||
display: { count: vi.fn() },
|
||||
contact: { count: vi.fn() },
|
||||
segment: { count: vi.fn() },
|
||||
integration: { findMany: vi.fn() },
|
||||
account: { findMany: vi.fn() },
|
||||
$queryRaw: vi.fn(),
|
||||
},
|
||||
}));
|
||||
vi.mock("@formbricks/logger", () => ({
|
||||
logger: {
|
||||
error: vi.fn(),
|
||||
info: vi.fn(),
|
||||
warn: vi.fn(),
|
||||
debug: vi.fn(),
|
||||
},
|
||||
}));
|
||||
vi.mock("@/lib/env", () => ({
|
||||
env: {
|
||||
SMTP_HOST: "smtp.example.com",
|
||||
S3_BUCKET_NAME: "my-bucket",
|
||||
PROMETHEUS_ENABLED: true,
|
||||
RECAPTCHA_SITE_KEY: "site-key",
|
||||
RECAPTCHA_SECRET_KEY: "secret-key",
|
||||
GITHUB_ID: "github-id",
|
||||
},
|
||||
}));
|
||||
|
||||
// Mock fetch
|
||||
const fetchMock = vi.fn();
|
||||
globalThis.fetch = fetchMock;
|
||||
|
||||
const mockCacheService = {
|
||||
get: vi.fn(),
|
||||
set: vi.fn(),
|
||||
tryLock: vi.fn(),
|
||||
del: vi.fn(),
|
||||
};
|
||||
|
||||
describe("sendTelemetryEvents", () => {
|
||||
beforeEach(() => {
|
||||
vi.resetAllMocks();
|
||||
vi.useFakeTimers();
|
||||
// Set a fixed time far in the past to ensure we can always send telemetry
|
||||
vi.setSystemTime(new Date("2024-01-01T00:00:00.000Z"));
|
||||
|
||||
// Setup default cache behavior
|
||||
vi.mocked(getCacheService).mockResolvedValue({
|
||||
ok: true,
|
||||
data: mockCacheService as any,
|
||||
});
|
||||
mockCacheService.tryLock.mockResolvedValue({ ok: true, data: true }); // Lock acquired
|
||||
mockCacheService.del.mockResolvedValue({ ok: true, data: undefined });
|
||||
mockCacheService.get.mockResolvedValue({ ok: true, data: null }); // No last sent time
|
||||
mockCacheService.set.mockResolvedValue({ ok: true, data: undefined });
|
||||
|
||||
// Setup default prisma behavior
|
||||
vi.mocked(prisma.organization.findFirst).mockResolvedValue({
|
||||
id: "org-123",
|
||||
createdAt: new Date("2023-01-01"),
|
||||
} as any);
|
||||
|
||||
// Mock raw SQL query for counts (batched query)
|
||||
vi.mocked(prisma.$queryRaw).mockResolvedValue([
|
||||
{
|
||||
organizationCount: BigInt(1),
|
||||
userCount: BigInt(5),
|
||||
teamCount: BigInt(2),
|
||||
projectCount: BigInt(3),
|
||||
surveyCount: BigInt(10),
|
||||
inProgressSurveyCount: BigInt(4),
|
||||
completedSurveyCount: BigInt(6),
|
||||
responseCountAllTime: BigInt(100),
|
||||
responseCountSinceLastUpdate: BigInt(10),
|
||||
displayCount: BigInt(50),
|
||||
contactCount: BigInt(20),
|
||||
segmentCount: BigInt(4),
|
||||
newestResponseAt: new Date("2024-01-01T00:00:00.000Z"),
|
||||
},
|
||||
] as any);
|
||||
|
||||
// Mock other queries
|
||||
vi.mocked(prisma.integration.findMany).mockResolvedValue([{ type: IntegrationType.notion }] as any);
|
||||
vi.mocked(prisma.account.findMany).mockResolvedValue([{ provider: "github" }] as any);
|
||||
|
||||
fetchMock.mockResolvedValue({ ok: true });
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
vi.useRealTimers();
|
||||
});
|
||||
|
||||
test("should send telemetry successfully when conditions are met", async () => {
|
||||
await sendTelemetryEvents();
|
||||
|
||||
// Check lock acquisition
|
||||
expect(mockCacheService.tryLock).toHaveBeenCalledWith(
|
||||
"telemetry_lock",
|
||||
"locked",
|
||||
60 * 1000 // 1 minute TTL
|
||||
);
|
||||
|
||||
// Check data gathering
|
||||
expect(prisma.organization.findFirst).toHaveBeenCalled();
|
||||
expect(prisma.$queryRaw).toHaveBeenCalled();
|
||||
|
||||
// Check fetch call
|
||||
expect(fetchMock).toHaveBeenCalledTimes(1);
|
||||
const payload = JSON.parse(fetchMock.mock.calls[0][1].body);
|
||||
expect(payload.organizationCount).toBe(1);
|
||||
expect(payload.userCount).toBe(5);
|
||||
expect(payload.integrations.notion).toBe(true);
|
||||
expect(payload.sso.github).toBe(true);
|
||||
|
||||
// Check cache update (no TTL parameter)
|
||||
expect(mockCacheService.set).toHaveBeenCalledWith("telemetry_last_sent_ts", expect.any(String));
|
||||
|
||||
// Check lock release
|
||||
expect(mockCacheService.del).toHaveBeenCalledWith(["telemetry_lock"]);
|
||||
});
|
||||
|
||||
test("should skip if in-memory check fails", async () => {
|
||||
// Run once to set nextTelemetryCheck
|
||||
await sendTelemetryEvents();
|
||||
vi.clearAllMocks();
|
||||
|
||||
// Run again immediately (should fail in-memory check)
|
||||
await sendTelemetryEvents();
|
||||
|
||||
expect(getCacheService).not.toHaveBeenCalled();
|
||||
expect(fetchMock).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
test("should skip if Redis last sent time is recent", async () => {
|
||||
// Mock last sent time as recent
|
||||
const recentTime = Date.now() - 1000 * 60 * 60; // 1 hour ago
|
||||
mockCacheService.get.mockResolvedValue({ ok: true, data: String(recentTime) });
|
||||
|
||||
await sendTelemetryEvents();
|
||||
|
||||
expect(mockCacheService.tryLock).not.toHaveBeenCalled(); // No lock attempt
|
||||
expect(fetchMock).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
test("should skip if lock cannot be acquired", async () => {
|
||||
mockCacheService.tryLock.mockResolvedValue({ ok: true, data: false }); // Lock not acquired
|
||||
|
||||
await sendTelemetryEvents();
|
||||
|
||||
expect(fetchMock).not.toHaveBeenCalled();
|
||||
expect(mockCacheService.del).not.toHaveBeenCalled(); // Shouldn't try to delete lock we didn't acquire
|
||||
});
|
||||
|
||||
test("should handle cache service failure gracefully", async () => {
|
||||
vi.mocked(getCacheService).mockResolvedValue({
|
||||
ok: false,
|
||||
error: new Error("Cache error"),
|
||||
} as any);
|
||||
|
||||
await sendTelemetryEvents();
|
||||
|
||||
expect(fetchMock).not.toHaveBeenCalled();
|
||||
// Should verify that nextTelemetryCheck was updated, but it's a module variable.
|
||||
// We can infer it by running again and checking calls
|
||||
vi.clearAllMocks();
|
||||
await sendTelemetryEvents();
|
||||
expect(getCacheService).not.toHaveBeenCalled(); // Should be blocked by in-memory check
|
||||
});
|
||||
|
||||
test("should handle telemetry send failure and apply cooldown", async () => {
|
||||
// Reset module to clear nextTelemetryCheck state from previous tests
|
||||
vi.resetModules();
|
||||
const { sendTelemetryEvents: freshSendTelemetryEvents } = await import("./telemetry");
|
||||
|
||||
// Ensure we can acquire lock by setting last sent time far in the past
|
||||
const oldTime = Date.now() - 25 * 60 * 60 * 1000; // 25 hours ago
|
||||
mockCacheService.get.mockResolvedValue({ ok: true, data: String(oldTime) });
|
||||
mockCacheService.tryLock.mockResolvedValue({ ok: true, data: true }); // Lock acquired
|
||||
|
||||
// Make fetch fail to trigger the catch block
|
||||
const networkError = new Error("Network error");
|
||||
fetchMock.mockRejectedValue(networkError);
|
||||
|
||||
await freshSendTelemetryEvents();
|
||||
|
||||
// Verify lock was acquired
|
||||
expect(mockCacheService.tryLock).toHaveBeenCalledWith("telemetry_lock", "locked", 60 * 1000);
|
||||
|
||||
// The error should be caught in the inner catch block
|
||||
// The actual implementation logs as warning, not error
|
||||
expect(logger.warn).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
error: networkError,
|
||||
message: "Network error",
|
||||
}),
|
||||
"Failed to send telemetry - applying 1h cooldown"
|
||||
);
|
||||
|
||||
// Lock should be released in finally block
|
||||
expect(mockCacheService.del).toHaveBeenCalledWith(["telemetry_lock"]);
|
||||
|
||||
// Cache should not be updated on failure
|
||||
expect(mockCacheService.set).not.toHaveBeenCalled();
|
||||
|
||||
// Verify cooldown: run again immediately (should be blocked by in-memory check)
|
||||
vi.clearAllMocks();
|
||||
mockCacheService.get.mockResolvedValue({ ok: true, data: null });
|
||||
mockCacheService.tryLock.mockResolvedValue({ ok: true, data: true });
|
||||
await freshSendTelemetryEvents();
|
||||
expect(getCacheService).not.toHaveBeenCalled(); // Should be blocked by in-memory check
|
||||
});
|
||||
|
||||
test("should skip if no organization exists", async () => {
|
||||
// Reset module to clear nextTelemetryCheck state from previous tests
|
||||
vi.resetModules();
|
||||
const { sendTelemetryEvents: freshSendTelemetryEvents } = await import("./telemetry");
|
||||
|
||||
// Ensure we can acquire lock by setting last sent time far in the past
|
||||
const oldTime = Date.now() - 25 * 60 * 60 * 1000; // 25 hours ago
|
||||
|
||||
// Re-setup mocks after resetModules
|
||||
vi.mocked(getCacheService).mockResolvedValue({
|
||||
ok: true,
|
||||
data: mockCacheService as any,
|
||||
});
|
||||
mockCacheService.tryLock.mockResolvedValue({ ok: true, data: true }); // Lock acquired
|
||||
mockCacheService.del.mockResolvedValue({ ok: true, data: undefined });
|
||||
mockCacheService.get.mockResolvedValue({ ok: true, data: String(oldTime) });
|
||||
mockCacheService.set.mockResolvedValue({ ok: true, data: undefined });
|
||||
|
||||
vi.mocked(prisma.organization.findFirst).mockResolvedValue(null);
|
||||
|
||||
await freshSendTelemetryEvents();
|
||||
|
||||
// sendTelemetry returns early when no org exists
|
||||
// Since it returns (not throws), the try block completes successfully
|
||||
// Then cache.set is called, and finally block executes
|
||||
expect(fetchMock).not.toHaveBeenCalled();
|
||||
|
||||
// Verify lock was acquired (prerequisite for finally block to execute)
|
||||
expect(mockCacheService.tryLock).toHaveBeenCalledWith("telemetry_lock", "locked", 60 * 1000);
|
||||
|
||||
// Lock should be released in finally block
|
||||
expect(mockCacheService.del).toHaveBeenCalledWith(["telemetry_lock"]);
|
||||
|
||||
// Note: The current implementation calls cache.set even when no org exists
|
||||
// This might be a bug, but we test the actual behavior
|
||||
expect(mockCacheService.set).toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
273
apps/web/app/api/(internal)/pipeline/lib/telemetry.ts
Normal file
273
apps/web/app/api/(internal)/pipeline/lib/telemetry.ts
Normal file
@@ -0,0 +1,273 @@
|
||||
import { IntegrationType } from "@prisma/client";
|
||||
import { createHash } from "node:crypto";
|
||||
import { type CacheKey, getCacheService } from "@formbricks/cache";
|
||||
import { prisma } from "@formbricks/database";
|
||||
import { logger } from "@formbricks/logger";
|
||||
import { env } from "@/lib/env";
|
||||
import packageJson from "@/package.json";
|
||||
|
||||
const TELEMETRY_INTERVAL_MS = 24 * 60 * 60 * 1000; // 24 hours
|
||||
const TELEMETRY_LOCK_KEY = "telemetry_lock" as CacheKey;
|
||||
const TELEMETRY_LAST_SENT_KEY = "telemetry_last_sent_ts" as CacheKey;
|
||||
|
||||
/**
|
||||
* In-memory timestamp for the next telemetry check.
|
||||
* This is a fast, process-local check to avoid unnecessary Redis calls.
|
||||
* Updated after each check to prevent redundant executions.
|
||||
*/
|
||||
let nextTelemetryCheck = 0;
|
||||
|
||||
/**
|
||||
* Sends telemetry events to Formbricks Enterprise endpoint.
|
||||
* Uses a three-layer check system to prevent duplicate submissions:
|
||||
* 1. In-memory check (fast, process-local)
|
||||
* 2. Redis check (shared across instances, persists across restarts)
|
||||
* 3. Distributed lock (prevents concurrent execution in multi-instance deployments)
|
||||
*/
|
||||
export const sendTelemetryEvents = async () => {
|
||||
try {
|
||||
const now = Date.now();
|
||||
|
||||
// ============================================================
|
||||
// CHECK 1: In-Memory Check (Fast Path)
|
||||
// ============================================================
|
||||
// Purpose: Quick process-local check to avoid Redis calls if we recently checked.
|
||||
// How it works: If current time is before nextTelemetryCheck, skip entirely.
|
||||
// This is updated after each successful check or failure to prevent spam.
|
||||
if (now < nextTelemetryCheck) {
|
||||
return;
|
||||
}
|
||||
|
||||
// ============================================================
|
||||
// CHECK 2: Redis Check (Shared State)
|
||||
// ============================================================
|
||||
// Purpose: Check if telemetry was sent recently by ANY instance (shared across cluster).
|
||||
// This persists across restarts and works in multi-instance deployments.
|
||||
|
||||
const cacheServiceResult = await getCacheService();
|
||||
if (!cacheServiceResult.ok) {
|
||||
// Redis unavailable: Fallback to in-memory cooldown to avoid spamming.
|
||||
// Wait 1 hour before trying again. This prevents hammering Redis when it's down.
|
||||
nextTelemetryCheck = now + 60 * 60 * 1000;
|
||||
return;
|
||||
}
|
||||
const cache = cacheServiceResult.data;
|
||||
|
||||
// Get the timestamp of when telemetry was last sent (from any instance).
|
||||
const lastSentResult = await cache.get(TELEMETRY_LAST_SENT_KEY);
|
||||
const lastSentStr = lastSentResult.ok && lastSentResult.data ? (lastSentResult.data as string) : null;
|
||||
const lastSent = lastSentStr ? Number.parseInt(lastSentStr, 10) : 0;
|
||||
|
||||
// If less than 24 hours have passed since last telemetry, skip.
|
||||
// Update in-memory check to match remaining time for fast-path optimization.
|
||||
if (now - lastSent < TELEMETRY_INTERVAL_MS) {
|
||||
nextTelemetryCheck = lastSent + TELEMETRY_INTERVAL_MS;
|
||||
return;
|
||||
}
|
||||
|
||||
// ============================================================
|
||||
// CHECK 3: Distributed Lock (Prevent Concurrent Execution)
|
||||
// ============================================================
|
||||
// Purpose: Ensure only ONE instance executes telemetry at a time in a cluster.
|
||||
// How it works:
|
||||
// - Uses Redis SET NX (only set if not exists) for atomic lock acquisition
|
||||
// - Lock expires after 1 minute (TTL) to prevent deadlocks if instance crashes
|
||||
// - If lock exists, another instance is already running telemetry, so we exit
|
||||
// - Lock is released in finally block after telemetry completes or fails
|
||||
const lockResult = await cache.tryLock(TELEMETRY_LOCK_KEY, "locked", 60 * 1000); // 1 minute TTL
|
||||
|
||||
if (!lockResult.ok || !lockResult.data) {
|
||||
// Lock acquisition failed or already held by another instance.
|
||||
// Exit silently - the other instance will handle telemetry.
|
||||
// No need to update nextTelemetryCheck here since we didn't execute.
|
||||
return;
|
||||
}
|
||||
|
||||
// ============================================================
|
||||
// EXECUTION: Send Telemetry
|
||||
// ============================================================
|
||||
// We've passed all checks and acquired the lock. Now execute telemetry.
|
||||
try {
|
||||
await sendTelemetry(lastSent);
|
||||
|
||||
// Success: Update Redis with current timestamp so other instances know telemetry was sent.
|
||||
// No TTL - persists indefinitely to support low-volume instances (responses every few days/weeks).
|
||||
await cache.set(TELEMETRY_LAST_SENT_KEY, now.toString());
|
||||
|
||||
// Update in-memory check to prevent this instance from checking again for 24h.
|
||||
nextTelemetryCheck = now + TELEMETRY_INTERVAL_MS;
|
||||
} catch (e) {
|
||||
// Log as warning since telemetry is non-essential
|
||||
const errorMessage = e instanceof Error ? e.message : String(e);
|
||||
logger.warn(
|
||||
{ error: e, message: errorMessage, lastSent, now },
|
||||
"Failed to send telemetry - applying 1h cooldown"
|
||||
);
|
||||
|
||||
// Failure cooldown: Prevent retrying immediately to avoid hammering the endpoint.
|
||||
// Wait 1 hour before allowing this instance to try again.
|
||||
// Note: Other instances can still try (they'll hit the lock or Redis check).
|
||||
nextTelemetryCheck = now + 60 * 60 * 1000;
|
||||
} finally {
|
||||
// Always release the lock, even if telemetry failed.
|
||||
// This allows other instances to retry if this one failed.
|
||||
await cache.del([TELEMETRY_LOCK_KEY]);
|
||||
}
|
||||
} catch (error) {
|
||||
// Catch-all for any unexpected errors in the wrapper logic (cache failures, lock issues, etc.)
|
||||
// Log as warning since telemetry is non-essential functionality
|
||||
const errorMessage = error instanceof Error ? error.message : String(error);
|
||||
logger.warn(
|
||||
{ error, message: errorMessage, timestamp: Date.now() },
|
||||
"Unexpected error in sendTelemetryEvents wrapper - telemetry check skipped"
|
||||
);
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* Gathers telemetry data and sends it to Formbricks Enterprise endpoint.
|
||||
* @param lastSent - Timestamp of last telemetry send (used to calculate incremental metrics)
|
||||
*/
|
||||
const sendTelemetry = async (lastSent: number) => {
|
||||
// Get the oldest organization to generate a stable, anonymized instance ID.
|
||||
// Using the oldest org ensures the ID doesn't change over time.
|
||||
const oldestOrg = await prisma.organization.findFirst({
|
||||
orderBy: { createdAt: "asc" },
|
||||
select: { id: true, createdAt: true },
|
||||
});
|
||||
|
||||
if (!oldestOrg) return; // No organization exists, nothing to report
|
||||
const instanceId = createHash("sha256").update(oldestOrg.id).digest("hex");
|
||||
|
||||
// Optimize database queries to reduce connection pool usage:
|
||||
// Instead of 15 parallel queries (which could exhaust the connection pool),
|
||||
// we batch all count queries into a single raw SQL query.
|
||||
// This reduces connection usage from 15 → 3 (batch counts + integrations + accounts).
|
||||
const [countsResult, integrations, ssoProviders] = await Promise.all([
|
||||
// Single query for all counts (13 metrics in one round-trip)
|
||||
prisma.$queryRaw<
|
||||
[
|
||||
{
|
||||
organizationCount: bigint;
|
||||
userCount: bigint;
|
||||
teamCount: bigint;
|
||||
projectCount: bigint;
|
||||
surveyCount: bigint;
|
||||
inProgressSurveyCount: bigint;
|
||||
completedSurveyCount: bigint;
|
||||
responseCountAllTime: bigint;
|
||||
responseCountSinceLastUpdate: bigint;
|
||||
displayCount: bigint;
|
||||
contactCount: bigint;
|
||||
segmentCount: bigint;
|
||||
newestResponseAt: Date | null;
|
||||
},
|
||||
]
|
||||
>`
|
||||
SELECT
|
||||
(SELECT COUNT(*) FROM "Organization") as "organizationCount",
|
||||
(SELECT COUNT(*) FROM "User") as "userCount",
|
||||
(SELECT COUNT(*) FROM "Team") as "teamCount",
|
||||
(SELECT COUNT(*) FROM "Project") as "projectCount",
|
||||
(SELECT COUNT(*) FROM "Survey") as "surveyCount",
|
||||
(SELECT COUNT(*) FROM "Survey" WHERE status = 'inProgress') as "inProgressSurveyCount",
|
||||
(SELECT COUNT(*) FROM "Survey" WHERE status = 'completed') as "completedSurveyCount",
|
||||
(SELECT COUNT(*) FROM "Response") as "responseCountAllTime",
|
||||
(SELECT COUNT(*) FROM "Response" WHERE "created_at" > ${new Date(lastSent || 0)}) as "responseCountSinceLastUpdate",
|
||||
(SELECT COUNT(*) FROM "Display") as "displayCount",
|
||||
(SELECT COUNT(*) FROM "Contact") as "contactCount",
|
||||
(SELECT COUNT(*) FROM "Segment") as "segmentCount",
|
||||
(SELECT MAX("created_at") FROM "Response") as "newestResponseAt"
|
||||
`,
|
||||
// Keep these as separate queries since they need DISTINCT which is harder to optimize
|
||||
prisma.integration.findMany({ select: { type: true }, distinct: ["type"] }),
|
||||
prisma.account.findMany({ select: { provider: true }, distinct: ["provider"] }),
|
||||
]);
|
||||
|
||||
// Extract metrics from the batched query result and convert bigints to numbers
|
||||
const counts = countsResult[0];
|
||||
const organizationCount = Number(counts.organizationCount);
|
||||
const userCount = Number(counts.userCount);
|
||||
const teamCount = Number(counts.teamCount);
|
||||
const projectCount = Number(counts.projectCount);
|
||||
const surveyCount = Number(counts.surveyCount);
|
||||
const inProgressSurveyCount = Number(counts.inProgressSurveyCount);
|
||||
const completedSurveyCount = Number(counts.completedSurveyCount);
|
||||
const responseCountAllTime = Number(counts.responseCountAllTime);
|
||||
const responseCountSinceLastUpdate = Number(counts.responseCountSinceLastUpdate);
|
||||
const displayCount = Number(counts.displayCount);
|
||||
const contactCount = Number(counts.contactCount);
|
||||
const segmentCount = Number(counts.segmentCount);
|
||||
const newestResponse = counts.newestResponseAt ? { createdAt: counts.newestResponseAt } : null;
|
||||
|
||||
// Convert integration array to boolean map indicating which integrations are configured.
|
||||
const integrationMap = {
|
||||
notion: integrations.some((i) => i.type === IntegrationType.notion),
|
||||
googleSheets: integrations.some((i) => i.type === IntegrationType.googleSheets),
|
||||
airtable: integrations.some((i) => i.type === IntegrationType.airtable),
|
||||
slack: integrations.some((i) => i.type === IntegrationType.slack),
|
||||
};
|
||||
|
||||
// Check SSO configuration: either via environment variables or database records.
|
||||
// This detects which SSO providers are available/configured.
|
||||
const ssoMap = {
|
||||
github: !!env.GITHUB_ID || ssoProviders.some((p) => p.provider === "github"),
|
||||
google: !!env.GOOGLE_CLIENT_ID || ssoProviders.some((p) => p.provider === "google"),
|
||||
azureAd: !!env.AZUREAD_CLIENT_ID || ssoProviders.some((p) => p.provider === "azuread"),
|
||||
oidc: !!env.OIDC_CLIENT_ID || ssoProviders.some((p) => p.provider === "openid"),
|
||||
};
|
||||
|
||||
// Construct telemetry payload with usage statistics and configuration.
|
||||
const payload = {
|
||||
schemaVersion: 1, // Schema version for future compatibility
|
||||
// Core entity counts
|
||||
organizationCount,
|
||||
userCount,
|
||||
teamCount,
|
||||
projectCount,
|
||||
surveyCount,
|
||||
inProgressSurveyCount,
|
||||
completedSurveyCount,
|
||||
// Response metrics
|
||||
responseCountAllTime,
|
||||
responseCountSinceLastUsageUpdate: responseCountSinceLastUpdate, // Incremental since last telemetry
|
||||
displayCount,
|
||||
contactCount,
|
||||
segmentCount,
|
||||
integrations: integrationMap,
|
||||
infrastructure: {
|
||||
smtp: !!env.SMTP_HOST,
|
||||
s3: !!env.S3_BUCKET_NAME,
|
||||
prometheus: !!env.PROMETHEUS_ENABLED,
|
||||
},
|
||||
security: {
|
||||
recaptcha: !!(env.RECAPTCHA_SITE_KEY && env.RECAPTCHA_SECRET_KEY),
|
||||
},
|
||||
sso: ssoMap,
|
||||
meta: {
|
||||
version: packageJson.version, // Formbricks version for compatibility tracking
|
||||
},
|
||||
temporal: {
|
||||
instanceCreatedAt: oldestOrg.createdAt.toISOString(), // When instance was first created
|
||||
newestResponseAt: newestResponse?.createdAt.toISOString() || null, // Most recent activity
|
||||
},
|
||||
};
|
||||
|
||||
// Send telemetry to Formbricks Enterprise endpoint.
|
||||
// This endpoint collects usage statistics for enterprise license validation and analytics.
|
||||
const url = `https://ee.formbricks.com/api/v1/instances/${instanceId}/usage-updates`;
|
||||
|
||||
const controller = new AbortController();
|
||||
const timeout = setTimeout(() => controller.abort(), 10000); // 10 second timeout
|
||||
|
||||
await fetch(url, {
|
||||
method: "POST",
|
||||
headers: {
|
||||
"Content-Type": "application/json",
|
||||
},
|
||||
body: JSON.stringify(payload),
|
||||
signal: controller.signal,
|
||||
});
|
||||
|
||||
clearTimeout(timeout);
|
||||
};
|
||||
@@ -3,6 +3,7 @@ import { headers } from "next/headers";
|
||||
import { prisma } from "@formbricks/database";
|
||||
import { logger } from "@formbricks/logger";
|
||||
import { ResourceNotFoundError } from "@formbricks/types/errors";
|
||||
import { sendTelemetryEvents } from "@/app/api/(internal)/pipeline/lib/telemetry";
|
||||
import { ZPipelineInput } from "@/app/api/(internal)/pipeline/types/pipelines";
|
||||
import { responses } from "@/app/lib/api/response";
|
||||
import { transformErrorToDetails } from "@/app/lib/api/validator";
|
||||
@@ -226,6 +227,10 @@ export const POST = async (request: Request) => {
|
||||
}
|
||||
});
|
||||
}
|
||||
if (event === "responseCreated") {
|
||||
// Send telemetry events
|
||||
await sendTelemetryEvents();
|
||||
}
|
||||
|
||||
return Response.json({ data: {} });
|
||||
};
|
||||
|
||||
@@ -1109,10 +1109,11 @@ const reviewPrompt = (t: TFunction): TTemplate => {
|
||||
required: false,
|
||||
buttonUrl: "https://formbricks.com/github",
|
||||
buttonExternal: true,
|
||||
ctaButtonLabel: t("templates.review_prompt_question_2_button_label"),
|
||||
}),
|
||||
],
|
||||
logic: [createBlockJumpLogic(reusableElementIds[1], localSurvey.endings[0].id, "isClicked")],
|
||||
buttonLabel: t("templates.review_prompt_question_2_button_label"),
|
||||
buttonLabel: t("templates.next"),
|
||||
backButtonLabel: t("templates.back"),
|
||||
t,
|
||||
}),
|
||||
@@ -1159,9 +1160,10 @@ const interviewPrompt = (t: TFunction): TTemplate => {
|
||||
buttonUrl: "https://cal.com/johannes",
|
||||
buttonExternal: true,
|
||||
required: false,
|
||||
ctaButtonLabel: t("templates.interview_prompt_question_1_button_label"),
|
||||
}),
|
||||
],
|
||||
buttonLabel: t("templates.interview_prompt_question_1_button_label"),
|
||||
buttonLabel: t("templates.next"),
|
||||
t,
|
||||
}),
|
||||
],
|
||||
@@ -2694,9 +2696,10 @@ const marketSiteClarity = (t: TFunction): TTemplate => {
|
||||
required: false,
|
||||
buttonUrl: "https://app.formbricks.com/auth/signup",
|
||||
buttonExternal: true,
|
||||
ctaButtonLabel: t("templates.market_site_clarity_question_3_button_label"),
|
||||
}),
|
||||
],
|
||||
buttonLabel: t("templates.market_site_clarity_question_3_button_label"),
|
||||
buttonLabel: t("templates.next"),
|
||||
t,
|
||||
}),
|
||||
],
|
||||
|
||||
@@ -133,13 +133,16 @@ export const BlockCard = ({
|
||||
// A button label is invalid if it exists but doesn't have valid text for all enabled languages
|
||||
const surveyLanguages = localSurvey.languages ?? [];
|
||||
const hasInvalidButtonLabel =
|
||||
block.buttonLabel !== undefined && !isLabelValidForAllLanguages(block.buttonLabel, surveyLanguages);
|
||||
block.buttonLabel !== undefined &&
|
||||
block.buttonLabel["default"]?.trim() !== "" &&
|
||||
!isLabelValidForAllLanguages(block.buttonLabel, surveyLanguages);
|
||||
|
||||
// Check if back button label is invalid
|
||||
// Back button label should exist for all blocks except the first one
|
||||
const hasInvalidBackButtonLabel =
|
||||
blockIdx > 0 &&
|
||||
block.backButtonLabel !== undefined &&
|
||||
block.backButtonLabel["default"]?.trim() !== "" &&
|
||||
!isLabelValidForAllLanguages(block.backButtonLabel, surveyLanguages);
|
||||
|
||||
// Block should be highlighted if it has invalid elements OR invalid button labels
|
||||
|
||||
@@ -513,8 +513,8 @@ export const ElementsView = ({
|
||||
id: newBlockId,
|
||||
name: getBlockName(index ?? prevSurvey.blocks.length),
|
||||
elements: [{ ...updatedElement, isDraft: true }],
|
||||
buttonLabel: createI18nString(t("templates.next"), []),
|
||||
backButtonLabel: createI18nString(t("templates.back"), []),
|
||||
buttonLabel: createI18nString("", []),
|
||||
backButtonLabel: createI18nString("", []),
|
||||
};
|
||||
|
||||
return {
|
||||
|
||||
@@ -232,7 +232,7 @@ test.describe("Survey Create & Submit Response without logic", async () => {
|
||||
for (let i = 0; i < surveys.createAndSubmit.ranking.choices.length; i++) {
|
||||
await page.getByText(surveys.createAndSubmit.ranking.choices[i]).click();
|
||||
}
|
||||
await page.locator("#questionCard-12").getByRole("button", { name: "Next" }).click();
|
||||
await page.locator("#questionCard-12").getByRole("button", { name: "Finish" }).click();
|
||||
// loading spinner -> wait for it to disappear
|
||||
await page.getByTestId("loading-spinner").waitFor({ state: "hidden" });
|
||||
});
|
||||
@@ -979,7 +979,7 @@ test.describe("Testing Survey with advanced logic", async () => {
|
||||
.fill("This is my city");
|
||||
await expect(page.getByLabel(surveys.createWithLogicAndSubmit.address.placeholder.zip)).toBeVisible();
|
||||
await page.getByLabel(surveys.createWithLogicAndSubmit.address.placeholder.zip).fill("12345");
|
||||
await page.locator("#questionCard-13").getByRole("button", { name: "Next" }).click();
|
||||
await page.locator("#questionCard-13").getByRole("button", { name: "Finish" }).click();
|
||||
|
||||
// loading spinner -> wait for it to disappear
|
||||
await page.getByTestId("loading-spinner").waitFor({ state: "hidden" });
|
||||
|
||||
60
packages/cache/src/service.ts
vendored
60
packages/cache/src/service.ts
vendored
@@ -3,7 +3,7 @@ import type { RedisClient } from "@/types/client";
|
||||
import { type CacheError, CacheErrorClass, ErrorCode, type Result, err, ok } from "@/types/error";
|
||||
import type { CacheKey } from "@/types/keys";
|
||||
import { ZCacheKey } from "@/types/keys";
|
||||
import { ZTtlMs } from "@/types/service";
|
||||
import { ZTtlMs, ZTtlMsOptional } from "@/types/service";
|
||||
import { validateInputs } from "./utils/validation";
|
||||
|
||||
/**
|
||||
@@ -116,13 +116,13 @@ export class CacheService {
|
||||
}
|
||||
|
||||
/**
|
||||
* Set a value in cache with automatic JSON serialization and TTL
|
||||
* Set a value in cache with automatic JSON serialization and optional TTL
|
||||
* @param key - Cache key to store under
|
||||
* @param value - Value to store
|
||||
* @param ttlMs - Time to live in milliseconds
|
||||
* @param ttlMs - Time to live in milliseconds (optional - if omitted, key persists indefinitely)
|
||||
* @returns Result containing void or an error
|
||||
*/
|
||||
async set(key: CacheKey, value: unknown, ttlMs: number): Promise<Result<void, CacheError>> {
|
||||
async set(key: CacheKey, value: unknown, ttlMs?: number): Promise<Result<void, CacheError>> {
|
||||
// Check Redis availability first
|
||||
if (!this.isRedisClientReady()) {
|
||||
return err({
|
||||
@@ -130,8 +130,8 @@ export class CacheService {
|
||||
});
|
||||
}
|
||||
|
||||
// Validate both key and TTL in one call
|
||||
const validation = validateInputs([key, ZCacheKey], [ttlMs, ZTtlMs]);
|
||||
// Validate key and optional TTL
|
||||
const validation = validateInputs([key, ZCacheKey], [ttlMs, ZTtlMsOptional]);
|
||||
if (!validation.ok) {
|
||||
return validation;
|
||||
}
|
||||
@@ -141,7 +141,13 @@ export class CacheService {
|
||||
const normalizedValue = value === undefined ? null : value;
|
||||
const serialized = JSON.stringify(normalizedValue);
|
||||
|
||||
await this.withTimeout(this.redis.setEx(key, Math.floor(ttlMs / 1000), serialized));
|
||||
if (ttlMs === undefined) {
|
||||
// Set without expiration (persists indefinitely)
|
||||
await this.withTimeout(this.redis.set(key, serialized));
|
||||
} else {
|
||||
// Set with expiration
|
||||
await this.withTimeout(this.redis.setEx(key, Math.floor(ttlMs / 1000), serialized));
|
||||
}
|
||||
return ok(undefined);
|
||||
} catch (error) {
|
||||
logger.error({ error, key, ttlMs }, "Cache set operation failed");
|
||||
@@ -185,6 +191,44 @@ export class CacheService {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Try to acquire a distributed lock (atomic SET NX operation)
|
||||
* @param key - Lock key
|
||||
* @param value - Lock value (typically "locked" or instance identifier)
|
||||
* @param ttlMs - Time to live in milliseconds (lock expiration)
|
||||
* @returns Result containing boolean indicating if lock was acquired, or an error
|
||||
*/
|
||||
async tryLock(key: CacheKey, value: string, ttlMs: number): Promise<Result<boolean, CacheError>> {
|
||||
// Check Redis availability first
|
||||
if (!this.isRedisClientReady()) {
|
||||
return err({
|
||||
code: ErrorCode.RedisConnectionError,
|
||||
});
|
||||
}
|
||||
|
||||
const validation = validateInputs([key, ZCacheKey], [ttlMs, ZTtlMs]);
|
||||
if (!validation.ok) {
|
||||
return validation;
|
||||
}
|
||||
|
||||
try {
|
||||
// Use SET with NX (only set if not exists) and PX (expiration in milliseconds) for atomic lock acquisition
|
||||
const result = await this.withTimeout(
|
||||
this.redis.set(key, value, {
|
||||
NX: true,
|
||||
PX: ttlMs,
|
||||
})
|
||||
);
|
||||
// SET returns "OK" if lock was acquired, null if key already exists
|
||||
return ok(result === "OK");
|
||||
} catch (error) {
|
||||
logger.error({ error, key, ttlMs }, "Cache lock operation failed");
|
||||
return err({
|
||||
code: ErrorCode.RedisOperationError,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Cache wrapper for functions (cache-aside).
|
||||
* Never throws due to cache errors; function errors propagate without retry.
|
||||
@@ -243,7 +287,7 @@ export class CacheService {
|
||||
}
|
||||
|
||||
private async trySetCache(key: CacheKey, value: unknown, ttlMs: number): Promise<void> {
|
||||
if (typeof value === "undefined") {
|
||||
if (value === undefined) {
|
||||
return; // Skip caching undefined values
|
||||
}
|
||||
|
||||
|
||||
7
packages/cache/types/service.ts
vendored
7
packages/cache/types/service.ts
vendored
@@ -5,3 +5,10 @@ export const ZTtlMs = z
|
||||
.int()
|
||||
.min(1000, "TTL must be at least 1000ms (1 second)")
|
||||
.finite("TTL must be finite");
|
||||
|
||||
export const ZTtlMsOptional = z
|
||||
.number()
|
||||
.int()
|
||||
.min(1000, "TTL must be at least 1000ms (1 second)")
|
||||
.finite("TTL must be finite")
|
||||
.optional();
|
||||
|
||||
@@ -1,8 +1,15 @@
|
||||
import { createId } from "@paralleldrive/cuid2";
|
||||
import { logger } from "@formbricks/logger";
|
||||
import type { MigrationScript } from "../../src/scripts/migration-runner";
|
||||
import type { Block, CTAMigrationStats, SurveyRecord } from "./types";
|
||||
import { migrateQuestionsSurveyToBlocks } from "./utils";
|
||||
import type {
|
||||
Block,
|
||||
CTAMigrationStats,
|
||||
IntegrationConfig,
|
||||
IntegrationMigrationStats,
|
||||
MigratedIntegration,
|
||||
SurveyRecord,
|
||||
} from "./types";
|
||||
import { migrateIntegrationConfig, migrateQuestionsSurveyToBlocks } from "./utils";
|
||||
|
||||
export const migrateQuestionsToBlocks: MigrationScript = {
|
||||
type: "data",
|
||||
@@ -25,71 +32,198 @@ export const migrateQuestionsToBlocks: MigrationScript = {
|
||||
|
||||
if (surveys.length === 0) {
|
||||
logger.info("No surveys found that need migration");
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
logger.info(`Found ${surveys.length.toString()} surveys to migrate`);
|
||||
|
||||
logger.info(`Found ${surveys.length.toString()} surveys to migrate`);
|
||||
// 2. Process each survey
|
||||
const updates: { id: string; blocks: Block[] }[] = [];
|
||||
|
||||
// 2. Process each survey
|
||||
const updates: { id: string; blocks: Block[] }[] = [];
|
||||
|
||||
for (const survey of surveys) {
|
||||
try {
|
||||
const migrated = migrateQuestionsSurveyToBlocks(survey, createId, ctaStats);
|
||||
updates.push({
|
||||
id: migrated.id,
|
||||
blocks: migrated.blocks,
|
||||
});
|
||||
} catch (error) {
|
||||
logger.error(error, `Failed to migrate survey ${survey.id}`);
|
||||
throw new Error(
|
||||
`Migration failed for survey ${survey.id}: ${error instanceof Error ? error.message : String(error)}`
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
logger.info(`Successfully processed ${updates.length.toString()} surveys`);
|
||||
|
||||
// 3. Update surveys individually for safety (avoids SQL injection risks with complex JSONB arrays)
|
||||
let updatedCount = 0;
|
||||
|
||||
for (const update of updates) {
|
||||
try {
|
||||
// PostgreSQL requires proper array format for jsonb[]
|
||||
// We need to convert the JSON array to a PostgreSQL jsonb array using array_to_json
|
||||
// The trick is to use jsonb_array_elements to convert the JSON array into rows, then array_agg to collect them back
|
||||
await tx.$executeRawUnsafe(
|
||||
`UPDATE "Survey"
|
||||
SET blocks = (
|
||||
SELECT array_agg(elem)
|
||||
FROM jsonb_array_elements($1::jsonb) AS elem
|
||||
),
|
||||
questions = '[]'::jsonb
|
||||
WHERE id = $2`,
|
||||
JSON.stringify(update.blocks),
|
||||
update.id
|
||||
);
|
||||
|
||||
updatedCount++;
|
||||
|
||||
// Log progress every 10000 surveys
|
||||
if (updatedCount % 10000 === 0) {
|
||||
logger.info(`Progress: ${updatedCount.toString()}/${updates.length.toString()} surveys updated`);
|
||||
for (const survey of surveys) {
|
||||
try {
|
||||
const migrated = migrateQuestionsSurveyToBlocks(survey, createId, ctaStats);
|
||||
updates.push({
|
||||
id: migrated.id,
|
||||
blocks: migrated.blocks,
|
||||
});
|
||||
} catch (error) {
|
||||
logger.error(error, `Failed to migrate survey ${survey.id}`);
|
||||
throw new Error(
|
||||
`Migration failed for survey ${survey.id}: ${error instanceof Error ? error.message : String(error)}`
|
||||
);
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error(error, `Failed to update survey ${update.id} in database`);
|
||||
throw new Error(
|
||||
`Database update failed for survey ${update.id}: ${error instanceof Error ? error.message : String(error)}`
|
||||
}
|
||||
|
||||
logger.info(`Successfully processed ${updates.length.toString()} surveys`);
|
||||
|
||||
// 3. Update surveys in batches using UNNEST for performance
|
||||
// Batch size of 150 balances performance with query size safety (~7.5MB per batch)
|
||||
const SURVEY_BATCH_SIZE = 150;
|
||||
let updatedCount = 0;
|
||||
|
||||
for (let i = 0; i < updates.length; i += SURVEY_BATCH_SIZE) {
|
||||
const batch = updates.slice(i, i + SURVEY_BATCH_SIZE);
|
||||
|
||||
try {
|
||||
// Build arrays for batch update
|
||||
const ids = batch.map((u) => u.id);
|
||||
const blocksJsonStrings = batch.map((u) => JSON.stringify(u.blocks));
|
||||
|
||||
// Use UNNEST to update multiple surveys in a single query
|
||||
await tx.$executeRawUnsafe(
|
||||
`UPDATE "Survey" AS s
|
||||
SET
|
||||
blocks = (
|
||||
SELECT array_agg(elem)
|
||||
FROM jsonb_array_elements(data.blocks_json::jsonb) AS elem
|
||||
),
|
||||
questions = '[]'::jsonb
|
||||
FROM (
|
||||
SELECT
|
||||
unnest($1::text[]) AS id,
|
||||
unnest($2::text[]) AS blocks_json
|
||||
) AS data
|
||||
WHERE s.id = data.id`,
|
||||
ids,
|
||||
blocksJsonStrings
|
||||
);
|
||||
|
||||
updatedCount += batch.length;
|
||||
|
||||
// Log progress
|
||||
logger.info(`Progress: ${updatedCount.toString()}/${updates.length.toString()} surveys updated`);
|
||||
} catch (error) {
|
||||
logger.error(error, `Failed to update survey batch starting at index ${i.toString()}`);
|
||||
throw new Error(
|
||||
`Database batch update failed at index ${i.toString()}: ${error instanceof Error ? error.message : String(error)}`
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
logger.info(`Migration complete: ${updatedCount.toString()} surveys migrated to blocks`);
|
||||
|
||||
// 4. Log CTA migration statistics
|
||||
if (ctaStats.totalCTAElements > 0) {
|
||||
logger.info(
|
||||
`CTA elements processed: ${ctaStats.totalCTAElements.toString()} total (${ctaStats.ctaWithExternalLink.toString()} with external link, ${ctaStats.ctaWithoutExternalLink.toString()} without)`
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
logger.info(`Migration complete: ${updatedCount.toString()} surveys migrated to blocks`);
|
||||
// 5. Migrate Integration configs
|
||||
logger.info("Starting integration config migration");
|
||||
|
||||
// Initialize integration statistics
|
||||
const integrationStats: IntegrationMigrationStats = {
|
||||
totalIntegrations: 0,
|
||||
googleSheets: { processed: 0, skipped: 0 },
|
||||
airtable: { processed: 0, skipped: 0 },
|
||||
slack: { processed: 0, skipped: 0 },
|
||||
notion: { processed: 0, skipped: 0 },
|
||||
n8n: { skipped: 0 },
|
||||
errors: 0,
|
||||
};
|
||||
|
||||
// Query all integrations
|
||||
const integrations = await tx.$queryRaw<{ id: string; type: string; config: IntegrationConfig }[]>`
|
||||
SELECT id, type, config
|
||||
FROM "Integration"
|
||||
`;
|
||||
|
||||
integrationStats.totalIntegrations = integrations.length;
|
||||
|
||||
if (integrations.length === 0) {
|
||||
logger.info("No integrations found to migrate");
|
||||
} else {
|
||||
logger.info(`Found ${integrations.length.toString()} integrations to process`);
|
||||
|
||||
// Process integrations in memory
|
||||
const integrationUpdates: MigratedIntegration[] = [];
|
||||
|
||||
for (const integration of integrations) {
|
||||
try {
|
||||
// Config is JSON from database - cast to IntegrationConfig for runtime processing
|
||||
const result = migrateIntegrationConfig(integration.type, integration.config);
|
||||
|
||||
// Track statistics
|
||||
const typeStats = integrationStats[integration.type as keyof typeof integrationStats];
|
||||
if (typeStats && typeof typeStats === "object" && "processed" in typeStats) {
|
||||
if (result.migrated) {
|
||||
typeStats.processed++;
|
||||
integrationUpdates.push({
|
||||
id: integration.id,
|
||||
config: result.config,
|
||||
});
|
||||
} else {
|
||||
typeStats.skipped++;
|
||||
}
|
||||
} else if (integration.type === "n8n") {
|
||||
integrationStats.n8n.skipped++;
|
||||
}
|
||||
} catch (error) {
|
||||
integrationStats.errors++;
|
||||
logger.error(error, `Failed to migrate integration ${integration.id} (type: ${integration.type})`);
|
||||
throw new Error(
|
||||
`Migration failed for integration ${integration.id}: ${error instanceof Error ? error.message : String(error)}`
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// 4. Log CTA migration statistics
|
||||
if (ctaStats.totalCTAElements > 0) {
|
||||
logger.info(
|
||||
`CTA elements processed: ${ctaStats.totalCTAElements.toString()} total (${ctaStats.ctaWithExternalLink.toString()} with external link, ${ctaStats.ctaWithoutExternalLink.toString()} without)`
|
||||
`Processed ${integrations.length.toString()} integrations: ${integrationUpdates.length.toString()} to update, ${(integrations.length - integrationUpdates.length).toString()} skipped`
|
||||
);
|
||||
|
||||
// Update integrations using Promise.all for better throughput
|
||||
if (integrationUpdates.length > 0) {
|
||||
// Batch size of 150 provides good parallelization (~750KB per batch)
|
||||
const INTEGRATION_BATCH_SIZE = 150;
|
||||
let integrationUpdatedCount = 0;
|
||||
|
||||
for (let i = 0; i < integrationUpdates.length; i += INTEGRATION_BATCH_SIZE) {
|
||||
const batch = integrationUpdates.slice(i, i + INTEGRATION_BATCH_SIZE);
|
||||
|
||||
try {
|
||||
// Execute all updates in parallel for this batch
|
||||
await Promise.all(
|
||||
batch.map((update) =>
|
||||
tx.$executeRawUnsafe(
|
||||
`UPDATE "Integration"
|
||||
SET config = $1::jsonb
|
||||
WHERE id = $2`,
|
||||
JSON.stringify(update.config),
|
||||
update.id
|
||||
)
|
||||
)
|
||||
);
|
||||
|
||||
integrationUpdatedCount += batch.length;
|
||||
|
||||
// Log progress
|
||||
logger.info(
|
||||
`Integration progress: ${integrationUpdatedCount.toString()}/${integrationUpdates.length.toString()} updated`
|
||||
);
|
||||
} catch (error) {
|
||||
logger.error(error, `Failed to update integration batch starting at index ${i.toString()}`);
|
||||
throw new Error(
|
||||
`Database update failed for integration batch at index ${i.toString()}: ${error instanceof Error ? error.message : String(error)}`
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
logger.info(
|
||||
`Integration migration complete: ${integrationUpdatedCount.toString()} integrations updated`
|
||||
);
|
||||
} else {
|
||||
logger.info("No integrations needed updating (all already migrated or skipped)");
|
||||
}
|
||||
|
||||
// Log detailed statistics
|
||||
logger.info(
|
||||
`Integration statistics: ` +
|
||||
`GoogleSheets: ${integrationStats.googleSheets.processed.toString()} migrated, ${integrationStats.googleSheets.skipped.toString()} skipped | ` +
|
||||
`Airtable: ${integrationStats.airtable.processed.toString()} migrated, ${integrationStats.airtable.skipped.toString()} skipped | ` +
|
||||
`Slack: ${integrationStats.slack.processed.toString()} migrated, ${integrationStats.slack.skipped.toString()} skipped | ` +
|
||||
`Notion: ${integrationStats.notion.processed.toString()} migrated, ${integrationStats.notion.skipped.toString()} skipped | ` +
|
||||
`n8n: ${integrationStats.n8n.skipped.toString()} skipped`
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
@@ -76,6 +76,151 @@ export interface CTAMigrationStats {
|
||||
ctaWithoutExternalLink: number;
|
||||
}
|
||||
|
||||
// Base integration config data (shared between all integrations except Notion)
|
||||
// This represents both old (questionIds/questions) and new (elementIds/elements) formats
|
||||
export interface IntegrationBaseSurveyData {
|
||||
createdAt: Date;
|
||||
surveyId: string;
|
||||
surveyName: string;
|
||||
// Old format fields
|
||||
questionIds?: string[];
|
||||
questions?: string;
|
||||
// New format fields
|
||||
elementIds?: string[];
|
||||
elements?: string;
|
||||
// Optional fields
|
||||
includeVariables?: boolean;
|
||||
includeHiddenFields?: boolean;
|
||||
includeMetadata?: boolean;
|
||||
includeCreatedAt?: boolean;
|
||||
}
|
||||
|
||||
// Google Sheets specific config
|
||||
export interface GoogleSheetsConfigData extends IntegrationBaseSurveyData {
|
||||
spreadsheetId: string;
|
||||
spreadsheetName: string;
|
||||
}
|
||||
|
||||
export interface GoogleSheetsConfig {
|
||||
key: {
|
||||
token_type: "Bearer";
|
||||
access_token: string;
|
||||
scope: string;
|
||||
expiry_date: number;
|
||||
refresh_token: string;
|
||||
};
|
||||
data: GoogleSheetsConfigData[];
|
||||
email: string;
|
||||
}
|
||||
|
||||
// Airtable specific config
|
||||
export interface AirtableConfigData extends IntegrationBaseSurveyData {
|
||||
tableId: string;
|
||||
baseId: string;
|
||||
tableName: string;
|
||||
}
|
||||
|
||||
export interface AirtableConfig {
|
||||
key: {
|
||||
expiry_date: string;
|
||||
access_token: string;
|
||||
refresh_token: string;
|
||||
};
|
||||
data: AirtableConfigData[];
|
||||
email: string;
|
||||
}
|
||||
|
||||
// Slack specific config
|
||||
export interface SlackConfigData extends IntegrationBaseSurveyData {
|
||||
channelId: string;
|
||||
channelName: string;
|
||||
}
|
||||
|
||||
export interface SlackConfig {
|
||||
key: {
|
||||
app_id: string;
|
||||
authed_user: { id: string };
|
||||
token_type: "bot";
|
||||
access_token: string;
|
||||
bot_user_id: string;
|
||||
team: { id: string; name: string };
|
||||
};
|
||||
data: SlackConfigData[];
|
||||
}
|
||||
|
||||
// Notion specific config (different structure - uses mapping instead of elementIds/elements)
|
||||
export interface NotionMappingItem {
|
||||
// Old format
|
||||
question?: { id: string; name: string; type: string };
|
||||
// New format
|
||||
element?: { id: string; name: string; type: string };
|
||||
column: { id: string; name: string; type: string };
|
||||
}
|
||||
|
||||
export interface NotionConfigData {
|
||||
createdAt: Date;
|
||||
surveyId: string;
|
||||
surveyName: string;
|
||||
mapping: NotionMappingItem[];
|
||||
databaseId: string;
|
||||
databaseName: string;
|
||||
}
|
||||
|
||||
export interface NotionConfig {
|
||||
key: {
|
||||
access_token: string;
|
||||
bot_id: string;
|
||||
token_type: string;
|
||||
duplicated_template_id: string | null;
|
||||
owner: {
|
||||
type: string;
|
||||
workspace?: boolean | null;
|
||||
user: {
|
||||
id: string;
|
||||
name?: string | null;
|
||||
type?: string | null;
|
||||
object: string;
|
||||
person?: { email: string } | null;
|
||||
avatar_url?: string | null;
|
||||
} | null;
|
||||
};
|
||||
workspace_icon: string | null;
|
||||
workspace_id: string;
|
||||
workspace_name: string | null;
|
||||
};
|
||||
data: NotionConfigData[];
|
||||
}
|
||||
|
||||
// Union type for all integration configs
|
||||
export type IntegrationConfig =
|
||||
| GoogleSheetsConfig
|
||||
| AirtableConfig
|
||||
| SlackConfig
|
||||
| NotionConfig
|
||||
| Record<string, unknown>;
|
||||
|
||||
// Integration migration types
|
||||
export interface IntegrationRecord {
|
||||
id: string;
|
||||
type: string;
|
||||
config: IntegrationConfig;
|
||||
}
|
||||
|
||||
export interface MigratedIntegration {
|
||||
id: string;
|
||||
config: IntegrationConfig;
|
||||
}
|
||||
|
||||
export interface IntegrationMigrationStats {
|
||||
totalIntegrations: number;
|
||||
googleSheets: { processed: number; skipped: number };
|
||||
airtable: { processed: number; skipped: number };
|
||||
slack: { processed: number; skipped: number };
|
||||
notion: { processed: number; skipped: number };
|
||||
n8n: { skipped: number };
|
||||
errors: number;
|
||||
}
|
||||
|
||||
// Type guards
|
||||
export const isSingleCondition = (condition: Condition): condition is SingleCondition => {
|
||||
return "leftOperand" in condition && "operator" in condition;
|
||||
|
||||
@@ -3,8 +3,10 @@ import {
|
||||
type CTAMigrationStats,
|
||||
type Condition,
|
||||
type ConditionGroup,
|
||||
type IntegrationConfig,
|
||||
type LogicAction,
|
||||
type MigratedSurvey,
|
||||
type NotionConfig,
|
||||
type SingleCondition,
|
||||
type SurveyLogic,
|
||||
type SurveyQuestion,
|
||||
@@ -414,3 +416,198 @@ export const migrateQuestionsSurveyToBlocks = (
|
||||
blocks,
|
||||
};
|
||||
};
|
||||
|
||||
// Type guard for config items with data array
|
||||
interface ConfigWithData {
|
||||
data: Record<string, unknown>[];
|
||||
[key: string]: unknown;
|
||||
}
|
||||
|
||||
const hasDataArray = (config: unknown): config is ConfigWithData => {
|
||||
return (
|
||||
typeof config === "object" &&
|
||||
config !== null &&
|
||||
"data" in config &&
|
||||
Array.isArray((config as ConfigWithData).data)
|
||||
);
|
||||
};
|
||||
|
||||
/**
|
||||
* Check if config item is already migrated (has elementIds/elements)
|
||||
*/
|
||||
const isAlreadyMigrated = (item: Record<string, unknown>): boolean => {
|
||||
return "elementIds" in item || "elements" in item;
|
||||
};
|
||||
|
||||
/**
|
||||
* Check if config item needs migration (has questionIds/questions)
|
||||
*/
|
||||
const needsMigration = (item: Record<string, unknown>): boolean => {
|
||||
return "questionIds" in item || "questions" in item;
|
||||
};
|
||||
|
||||
/**
|
||||
* Migrate Airtable/Google Sheets/Slack config (shared base type)
|
||||
* Returns an object with migrated flag and updated config
|
||||
*/
|
||||
export const migrateSharedIntegrationConfig = (
|
||||
config: IntegrationConfig
|
||||
): { migrated: boolean; config: IntegrationConfig } => {
|
||||
// Validate config structure
|
||||
if (!hasDataArray(config)) {
|
||||
return { migrated: false, config };
|
||||
}
|
||||
|
||||
let anyMigrated = false;
|
||||
|
||||
const newData = config.data.map((item) => {
|
||||
// Skip if already migrated
|
||||
if (isAlreadyMigrated(item)) {
|
||||
return item;
|
||||
}
|
||||
|
||||
// Skip if nothing to migrate
|
||||
if (!needsMigration(item)) {
|
||||
return item;
|
||||
}
|
||||
|
||||
anyMigrated = true;
|
||||
const migrated: Record<string, unknown> = { ...item };
|
||||
|
||||
// Rename questionIds to elementIds
|
||||
if ("questionIds" in migrated) {
|
||||
migrated.elementIds = migrated.questionIds;
|
||||
delete migrated.questionIds;
|
||||
}
|
||||
|
||||
// Rename questions to elements
|
||||
if ("questions" in migrated) {
|
||||
migrated.elements = migrated.questions;
|
||||
delete migrated.questions;
|
||||
}
|
||||
|
||||
// All other fields (includeVariables, etc.) are preserved automatically via spread
|
||||
|
||||
return migrated;
|
||||
});
|
||||
|
||||
return {
|
||||
migrated: anyMigrated,
|
||||
config: { ...config, data: newData },
|
||||
};
|
||||
};
|
||||
|
||||
// Type guard for Notion config
|
||||
const isNotionConfig = (config: unknown): config is NotionConfig => {
|
||||
return (
|
||||
typeof config === "object" &&
|
||||
config !== null &&
|
||||
"data" in config &&
|
||||
Array.isArray((config as NotionConfig).data)
|
||||
);
|
||||
};
|
||||
|
||||
// Type for Notion mapping entry
|
||||
interface NotionMappingEntry {
|
||||
question?: { id: string; name: string; type: string };
|
||||
element?: { id: string; name: string; type: string };
|
||||
column: { id: string; name: string; type: string };
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if Notion config item has any mapping entries that need migration
|
||||
* @param mapping - Notion mapping entries
|
||||
* @returns boolean
|
||||
*/
|
||||
const needsNotionMigration = (mapping: NotionMappingEntry[] | undefined): boolean => {
|
||||
if (!mapping || !Array.isArray(mapping) || mapping.length === 0) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// Check if ANY mapping item has "question" field (needs migration)
|
||||
return mapping.some((mapItem) => "question" in mapItem && !("element" in mapItem));
|
||||
};
|
||||
|
||||
/**
|
||||
* Migrate Notion config (custom mapping structure)
|
||||
* @param config - Notion config
|
||||
* @returns \{ migrated: boolean; config: IntegrationConfig \}
|
||||
*/
|
||||
export const migrateNotionIntegrationConfig = (
|
||||
config: IntegrationConfig
|
||||
): { migrated: boolean; config: IntegrationConfig } => {
|
||||
// Validate config structure
|
||||
if (!isNotionConfig(config)) {
|
||||
return { migrated: false, config };
|
||||
}
|
||||
|
||||
let anyMigrated = false;
|
||||
|
||||
const newData = config.data.map((item) => {
|
||||
// Cast mapping to the migration type that includes both old and new formats
|
||||
const mapping = item.mapping as NotionMappingEntry[] | undefined;
|
||||
|
||||
// Skip if nothing to migrate
|
||||
if (!needsNotionMigration(mapping)) {
|
||||
return item;
|
||||
}
|
||||
|
||||
anyMigrated = true;
|
||||
|
||||
// Migrate mapping array - check EACH item individually
|
||||
const newMapping = mapping?.map((mapItem) => {
|
||||
// Already has element field - skip this item
|
||||
if ("element" in mapItem) {
|
||||
return mapItem;
|
||||
}
|
||||
|
||||
// Has question field - migrate it
|
||||
if ("question" in mapItem) {
|
||||
const { question, ...rest } = mapItem;
|
||||
return {
|
||||
...rest,
|
||||
element: question,
|
||||
};
|
||||
}
|
||||
|
||||
// Neither element nor question - return as is
|
||||
return mapItem;
|
||||
});
|
||||
|
||||
return {
|
||||
...item,
|
||||
mapping: newMapping,
|
||||
};
|
||||
});
|
||||
|
||||
return {
|
||||
migrated: anyMigrated,
|
||||
config: { ...config, data: newData },
|
||||
};
|
||||
};
|
||||
|
||||
/**
|
||||
* Migrate integration config based on type
|
||||
* @param type - Integration type
|
||||
* @param config - Integration config
|
||||
* @returns \{ migrated: boolean; config: IntegrationConfig \}
|
||||
*/
|
||||
export const migrateIntegrationConfig = (
|
||||
type: string,
|
||||
config: IntegrationConfig
|
||||
): { migrated: boolean; config: IntegrationConfig } => {
|
||||
switch (type) {
|
||||
case "googleSheets":
|
||||
case "airtable":
|
||||
case "slack":
|
||||
return migrateSharedIntegrationConfig(config);
|
||||
case "notion":
|
||||
return migrateNotionIntegrationConfig(config);
|
||||
case "n8n":
|
||||
// n8n has no config schema to migrate
|
||||
return { migrated: false, config };
|
||||
default:
|
||||
// Unknown type - return unchanged
|
||||
return { migrated: false, config };
|
||||
}
|
||||
};
|
||||
|
||||
@@ -99,7 +99,7 @@ export function LanguageSwitch({
|
||||
{showLanguageDropdown ? (
|
||||
<div
|
||||
className={cn(
|
||||
"fb-bg-input-bg fb-text-heading fb-absolute fb-top-10 fb-max-h-64 fb-space-y-2 fb-overflow-auto fb-rounded-md fb-p-2 fb-text-xs",
|
||||
"fb-bg-input-bg fb-text-heading fb-absolute fb-top-10 fb-max-h-64 fb-space-y-2 fb-overflow-auto fb-rounded-md fb-p-2 fb-text-xs fb-border-border fb-border",
|
||||
dir === "rtl" ? "fb-left-8" : "fb-right-8"
|
||||
)}
|
||||
ref={languageDropdownRef}>
|
||||
|
||||
@@ -1356,6 +1356,8 @@ export const ZSurvey = z
|
||||
}
|
||||
|
||||
if (
|
||||
!isBackButtonHidden &&
|
||||
blockIndex > 0 &&
|
||||
block.backButtonLabel?.[defaultLanguageCode] &&
|
||||
block.backButtonLabel[defaultLanguageCode].trim() !== ""
|
||||
) {
|
||||
|
||||
Reference in New Issue
Block a user