diff --git a/.github/actions/cache-build-web/action.yml b/.github/actions/cache-build-web/action.yml index 4db8d682c6..a73a3da185 100644 --- a/.github/actions/cache-build-web/action.yml +++ b/.github/actions/cache-build-web/action.yml @@ -1,14 +1,10 @@ name: Build & Cache Web App -on: - workflow_dispatch: - inputs: - e2e_testing_mode: - description: "Set E2E Testing Mode" - required: false - default: "0" - inputs: + e2e_testing_mode: + description: "Set E2E Testing Mode" + required: false + default: "0" turbo_token: description: "Turborepo token" required: false diff --git a/apps/web/instrumentation-jobs.test.ts b/apps/web/instrumentation-jobs.test.ts index 942a93baed..febb55a770 100644 --- a/apps/web/instrumentation-jobs.test.ts +++ b/apps/web/instrumentation-jobs.test.ts @@ -4,6 +4,10 @@ const mockStartJobsRuntime = vi.fn(); const mockDebug = vi.fn(); const mockError = vi.fn(); const mockGetJobsWorkerBootstrapConfig = vi.fn(); +const TEST_TIMEOUT_MS = 15_000; +const slowTest = (name: string, fn: () => Promise): void => { + test(name, fn, TEST_TIMEOUT_MS); +}; vi.mock("@formbricks/jobs", () => ({ startJobsRuntime: mockStartJobsRuntime, @@ -33,7 +37,7 @@ describe("instrumentation-jobs", () => { await resetJobsWorkerRegistrationForTests(); }); - test("skips worker startup when disabled", async () => { + slowTest("skips worker startup when disabled", async () => { mockGetJobsWorkerBootstrapConfig.mockReturnValue({ enabled: false, runtimeOptions: null, @@ -47,7 +51,7 @@ describe("instrumentation-jobs", () => { expect(mockDebug).toHaveBeenCalledWith("BullMQ worker startup skipped"); }); - test("starts the worker only once", async () => { + slowTest("starts the worker only once", async () => { const mockRuntime = { close: vi.fn().mockResolvedValue(undefined), }; @@ -77,7 +81,7 @@ describe("instrumentation-jobs", () => { }); }); - test("logs and rethrows startup failures", async () => { + slowTest("logs and rethrows startup failures", async () => { const startupError = new Error("startup failed"); mockGetJobsWorkerBootstrapConfig.mockReturnValue({ diff --git a/apps/web/instrumentation-jobs.ts b/apps/web/instrumentation-jobs.ts index d7fddeca0d..9ea1807b38 100644 --- a/apps/web/instrumentation-jobs.ts +++ b/apps/web/instrumentation-jobs.ts @@ -3,14 +3,13 @@ import { startJobsRuntime } from "@formbricks/jobs"; import { logger } from "@formbricks/logger"; import { getJobsWorkerBootstrapConfig } from "@/lib/jobs/config"; -const globalForJobsRuntime = globalThis as { - formbricksJobsRuntime: JobsRuntimeHandle | undefined; - formbricksJobsRuntimeInitializing: Promise | undefined; -} as unknown as { +type TJobsRuntimeGlobal = typeof globalThis & { formbricksJobsRuntime: JobsRuntimeHandle | undefined; formbricksJobsRuntimeInitializing: Promise | undefined; }; +const globalForJobsRuntime = globalThis as TJobsRuntimeGlobal; + export const registerJobsWorker = async (): Promise => { const jobsWorkerBootstrapConfig = getJobsWorkerBootstrapConfig(); diff --git a/apps/web/instrumentation.ts b/apps/web/instrumentation.ts index 5d74ad63dc..a7e5c1fc54 100644 --- a/apps/web/instrumentation.ts +++ b/apps/web/instrumentation.ts @@ -1,5 +1,6 @@ import * as Sentry from "@sentry/nextjs"; import { type Instrumentation } from "next"; +import { logger } from "@formbricks/logger"; import { isExpectedError } from "@formbricks/types/errors"; import { IS_PRODUCTION, PROMETHEUS_ENABLED, SENTRY_DSN } from "@/lib/constants"; @@ -22,8 +23,12 @@ export const register = async () => { await import("./instrumentation-node"); } - const { registerJobsWorker } = await import("./instrumentation-jobs"); - await registerJobsWorker(); + try { + const { registerJobsWorker } = await import("./instrumentation-jobs"); + await registerJobsWorker(); + } catch (error) { + logger.error({ err: error }, "BullMQ worker registration failed during Next.js instrumentation"); + } } // Sentry init loads after OTEL to avoid TracerProvider conflicts // Sentry tracing is disabled (tracesSampleRate: 0) -- SigNoz handles distributed tracing diff --git a/apps/web/lib/env.test.ts b/apps/web/lib/env.test.ts new file mode 100644 index 0000000000..4e2f8f9096 --- /dev/null +++ b/apps/web/lib/env.test.ts @@ -0,0 +1,33 @@ +import { afterEach, beforeEach, describe, expect, test, vi } from "vitest"; + +const ORIGINAL_ENV = process.env; + +const setTestEnv = (overrides: Record = {}) => { + process.env = { + ...ORIGINAL_ENV, + NODE_ENV: "test", + DATABASE_URL: "https://example.com/db", + ENCRYPTION_KEY: "12345678901234567890123456789012", + ...overrides, + }; +}; + +describe("env", () => { + beforeEach(() => { + vi.resetModules(); + }); + + afterEach(() => { + process.env = ORIGINAL_ENV; + }); + + test("allows ambient DEBUG values from external tooling", async () => { + setTestEnv({ + DEBUG: "pnpm:*", + }); + + const { env } = await import("./env"); + + expect(env.DEBUG).toBe("pnpm:*"); + }); +}); diff --git a/apps/web/lib/env.ts b/apps/web/lib/env.ts index edf23208e4..499e2080ee 100644 --- a/apps/web/lib/env.ts +++ b/apps/web/lib/env.ts @@ -15,7 +15,9 @@ export const env = createEnv({ BREVO_API_KEY: z.string().optional(), BREVO_LIST_ID: z.string().optional(), DATABASE_URL: z.url(), - DEBUG: z.enum(["1", "0"]).optional(), + // DEBUG is a common ambient env var in CI/tooling, so we accept arbitrary strings here + // and only treat "1" as enabling Formbricks-specific debug behavior downstream. + DEBUG: z.string().optional(), AUTH_DEFAULT_TEAM_ID: z.string().optional(), AUTH_SKIP_INVITE_FOR_SSO: z.enum(["1", "0"]).optional(), BULLMQ_WORKER_CONCURRENCY: z.coerce.number().int().min(1).optional(), diff --git a/apps/web/lib/jobs/config.test.ts b/apps/web/lib/jobs/config.test.ts index c3b2599157..9e7b25a05d 100644 --- a/apps/web/lib/jobs/config.test.ts +++ b/apps/web/lib/jobs/config.test.ts @@ -69,4 +69,23 @@ describe("jobs runtime config", () => { }, }); }); + + test("respects explicit worker disable flag outside tests", async () => { + vi.doMock("@/lib/env", () => ({ + env: { + BULLMQ_WORKER_CONCURRENCY: 6, + BULLMQ_WORKER_COUNT: 3, + BULLMQ_WORKER_ENABLED: "0", + NODE_ENV: "production", + REDIS_URL: "redis://cache.internal:6379", + }, + })); + + const { getJobsWorkerBootstrapConfig } = await import("./config"); + + expect(getJobsWorkerBootstrapConfig()).toEqual({ + enabled: false, + runtimeOptions: null, + }); + }); }); diff --git a/packages/jobs/src/connection.test.ts b/packages/jobs/src/connection.test.ts index 7f50db08eb..fa43f5851a 100644 --- a/packages/jobs/src/connection.test.ts +++ b/packages/jobs/src/connection.test.ts @@ -78,6 +78,12 @@ describe("@formbricks/jobs connection helpers", () => { expect(() => getRedisUrlFromEnv()).toThrow("REDIS_URL is required for BullMQ"); }); + test("throws when REDIS_URL is malformed", () => { + process.env.REDIS_URL = "not-a-url"; + + expect(() => getRedisUrlFromEnv()).toThrow("REDIS_URL must be a valid URL for BullMQ"); + }); + test("quits an active Redis connection", async () => { await closeRedisConnection({ quit: mockQuit.mockResolvedValue(undefined), @@ -88,4 +94,15 @@ describe("@formbricks/jobs connection helpers", () => { expect(mockQuit).toHaveBeenCalledTimes(1); expect(mockDisconnect).not.toHaveBeenCalled(); }); + + test("disconnects a non-ready Redis connection", async () => { + await closeRedisConnection({ + quit: mockQuit, + disconnect: mockDisconnect, + status: "connecting", + } as unknown as IORedis); + + expect(mockQuit).not.toHaveBeenCalled(); + expect(mockDisconnect).toHaveBeenCalledTimes(1); + }); }); diff --git a/packages/jobs/src/connection.ts b/packages/jobs/src/connection.ts index 176bb8a482..96762b12e9 100644 --- a/packages/jobs/src/connection.ts +++ b/packages/jobs/src/connection.ts @@ -55,6 +55,10 @@ export const getRedisUrlFromEnv = (): string => { throw new Error("REDIS_URL is required for BullMQ"); } + if (!URL.canParse(redisUrl)) { + throw new Error("REDIS_URL must be a valid URL for BullMQ"); + } + return redisUrl; }; @@ -63,6 +67,11 @@ export const closeRedisConnection = async (connection: IORedis): Promise = return; } + if (connection.status !== "ready") { + connection.disconnect(); + return; + } + try { await connection.quit(); } catch { diff --git a/packages/jobs/src/jobs-integration.test.ts b/packages/jobs/src/jobs-integration.test.ts index 649e748d67..bf75f84b72 100644 --- a/packages/jobs/src/jobs-integration.test.ts +++ b/packages/jobs/src/jobs-integration.test.ts @@ -18,13 +18,22 @@ let queueEventsConnection: IORedis | null = null; let isRedisAvailable = false; async function isQueueReady(url: string): Promise { + let probe: Awaited> | null = null; + try { - const probe = await startJobsRuntime({ redisUrl: url }); - await probe.close(); + probe = await startJobsRuntime({ redisUrl: url }); return true; } catch (error) { logger.info({ error }, "BullMQ integration tests skipped because Redis is not available"); return false; + } finally { + if (probe) { + try { + await probe.close(); + } catch (error) { + logger.warn({ err: error }, "Failed to close BullMQ runtime probe cleanly"); + } + } } } diff --git a/packages/jobs/src/processors.test.ts b/packages/jobs/src/processors.test.ts index b5c272dc4c..ed4780173f 100644 --- a/packages/jobs/src/processors.test.ts +++ b/packages/jobs/src/processors.test.ts @@ -47,6 +47,49 @@ describe("@formbricks/jobs processor registry", () => { ); }); + test("fails fast for the unimplemented response pipeline processor", async () => { + await expect( + processJob({ + attemptsMade: 0, + data: { + environmentId: "env_123", + event: "responseCreated", + response: { + contact: null, + contactAttributes: null, + createdAt: new Date("2026-04-07T10:00:00.000Z"), + data: {}, + displayId: null, + endingId: null, + finished: false, + id: "cm8cmpnjj000108jfdr9dfqe6", + language: null, + meta: {}, + singleUseId: null, + surveyId: "cm8cmpnjj000108jfdr9dfqe7", + tags: [], + updatedAt: new Date("2026-04-07T10:00:00.000Z"), + variables: {}, + }, + surveyId: "survey_123", + }, + id: "job-3", + name: JOB_NAMES.responsePipeline, + queueName: "background-jobs", + } as never) + ).rejects.toThrow("Unimplemented response pipeline processor"); + + expect(mockError).toHaveBeenCalledWith( + expect.objectContaining({ + environmentId: "env_123", + jobId: "job-3", + jobName: JOB_NAMES.responsePipeline, + surveyId: "survey_123", + }), + "Unimplemented response pipeline processor" + ); + }); + test("throws for unknown jobs", async () => { await expect( processJob({ diff --git a/packages/jobs/src/processors/response-pipeline.ts b/packages/jobs/src/processors/response-pipeline.ts index 8f53616034..136fb52cfe 100644 --- a/packages/jobs/src/processors/response-pipeline.ts +++ b/packages/jobs/src/processors/response-pipeline.ts @@ -3,7 +3,7 @@ import type { JobHandler } from "@/src/contracts"; import type { TResponsePipelineJobData } from "@/src/types"; export const processResponsePipelineJob: JobHandler = (data, context) => { - logger.debug( + logger.error( { attempt: context.attempt, environmentId: data.environmentId, @@ -13,8 +13,10 @@ export const processResponsePipelineJob: JobHandler = jobName: context.jobName, queueName: context.queueName, }, - "Processed placeholder response pipeline job" + "Unimplemented response pipeline processor" ); - return Promise.resolve(); + throw new Error( + `Unimplemented response pipeline processor for job ${context.jobId} (${data.environmentId}/${data.surveyId})` + ); }; diff --git a/packages/jobs/src/queue.test.ts b/packages/jobs/src/queue.test.ts index 749640e62b..81d3ac8947 100644 --- a/packages/jobs/src/queue.test.ts +++ b/packages/jobs/src/queue.test.ts @@ -21,12 +21,14 @@ import { getRecurringJobSchedulerId } from "./schedules"; const { mockCloseRedisConnection, + mockLoggerError, mockQueueAdd, mockQueueClose, mockQueueUpsertJobScheduler, mockQueueWaitUntilReady, } = vi.hoisted(() => ({ mockCloseRedisConnection: vi.fn(), + mockLoggerError: vi.fn(), mockQueueAdd: vi.fn(), mockQueueClose: vi.fn(), mockQueueUpsertJobScheduler: vi.fn(), @@ -42,7 +44,7 @@ const mockConnection = { vi.mock("@formbricks/logger", () => ({ logger: { - error: vi.fn(), + error: mockLoggerError, info: vi.fn(), warn: vi.fn(), debug: vi.fn(), @@ -221,4 +223,18 @@ describe("@formbricks/jobs queue helpers", () => { expect(mockQueueClose).toHaveBeenCalledTimes(1); expect(mockCloseRedisConnection).toHaveBeenCalledWith(mockConnection); }); + + test("keeps cleaning up when queue shutdown fails during reset", async () => { + await getJobsQueue(); + mockQueueClose.mockRejectedValueOnce(new Error("queue close failed")); + + await expect(resetJobsQueueFactory()).resolves.toBeUndefined(); + + expect(mockQueueClose).toHaveBeenCalledTimes(1); + expect(mockCloseRedisConnection).toHaveBeenCalledWith(mockConnection); + expect(mockLoggerError).toHaveBeenCalledTimes(1); + const [context, message] = mockLoggerError.mock.calls[0] as [{ err: Error }, string]; + expect(context.err).toBeInstanceOf(Error); + expect(message).toBe("Failed to close BullMQ producer queue during reset"); + }); }); diff --git a/packages/jobs/src/queue.ts b/packages/jobs/src/queue.ts index 13d950cded..19fb429482 100644 --- a/packages/jobs/src/queue.ts +++ b/packages/jobs/src/queue.ts @@ -293,12 +293,20 @@ export const getBackgroundJobProducer = (): BackgroundJobProducer => ({ }); export const resetJobsQueueFactory = async (): Promise => { - if (queueSingleton) { - await queueSingleton.close(); + try { + if (queueSingleton) { + await queueSingleton.close(); + } + } catch (error) { + logger.error({ err: error }, "Failed to close BullMQ producer queue during reset"); } - if (connectionSingleton) { - await closeRedisConnection(connectionSingleton); + try { + if (connectionSingleton) { + await closeRedisConnection(connectionSingleton); + } + } catch (error) { + logger.error({ err: error }, "Failed to close BullMQ producer connection during reset"); } queueSingleton = undefined; diff --git a/packages/jobs/src/runtime.ts b/packages/jobs/src/runtime.ts index 3693d85d0c..4d56356758 100644 --- a/packages/jobs/src/runtime.ts +++ b/packages/jobs/src/runtime.ts @@ -120,11 +120,15 @@ export const startJobsRuntime = async ({ }; const handleSigterm = (): void => { - void closeRuntime(); + closeRuntime().catch((error: unknown) => { + logger.error({ err: error }, "BullMQ shutdown failed in closeRuntime after SIGTERM"); + }); }; const handleSigint = (): void => { - void closeRuntime(); + closeRuntime().catch((error: unknown) => { + logger.error({ err: error }, "BullMQ shutdown failed in closeRuntime after SIGINT"); + }); }; try { diff --git a/packages/jobs/src/schedules.ts b/packages/jobs/src/schedules.ts index 23e62d8b18..652ef2a9d7 100644 --- a/packages/jobs/src/schedules.ts +++ b/packages/jobs/src/schedules.ts @@ -14,6 +14,9 @@ const ZScheduleWindow = { startAt: ZValidDate.optional(), } as const; +const hasValidScheduleWindow = (value: { startAt?: Date; endAt?: Date }): boolean => + !value.startAt || !value.endAt || value.endAt.getTime() > value.startAt.getTime(); + export const ZBackgroundJobScheduleId = z.string().trim().min(1); export const ZBackgroundJobScheduleScope = z.string().trim().min(1); export const ZBackgroundJobScheduleIdentity = z.object({ @@ -35,7 +38,7 @@ export const ZRecurringEveryBackgroundJobSchedule = z everyMs: ZPositiveInteger, kind: z.literal("every"), }) - .refine((value) => !value.startAt || !value.endAt || value.endAt.getTime() > value.startAt.getTime(), { + .refine(hasValidScheduleWindow, { message: "endAt must be after startAt", path: ["endAt"], }); @@ -48,7 +51,7 @@ export const ZRecurringCronBackgroundJobSchedule = z kind: z.literal("cron"), timeZone: z.string().trim().min(1).optional(), }) - .refine((value) => !value.startAt || !value.endAt || value.endAt.getTime() > value.startAt.getTime(), { + .refine(hasValidScheduleWindow, { message: "endAt must be after startAt", path: ["endAt"], }); diff --git a/packages/jobs/src/types.ts b/packages/jobs/src/types.ts index 09b89f4362..dd72506472 100644 --- a/packages/jobs/src/types.ts +++ b/packages/jobs/src/types.ts @@ -1,5 +1,4 @@ import { z } from "zod"; -import { ZWebhook } from "@formbricks/database/zod/webhooks"; import { ZResponse } from "@formbricks/types/responses"; export const ZTestLogJobData = z.object({ @@ -10,8 +9,12 @@ export const ZTestLogJobData = z.object({ export type TTestLogJobData = z.infer; +export const ZResponsePipelineEvent = z.enum(["responseFinished", "responseCreated", "responseUpdated"]); + +export type TResponsePipelineEvent = z.infer; + export const ZResponsePipelineJobData = z.object({ - event: ZWebhook.shape.triggers.element, + event: ZResponsePipelineEvent, response: ZResponse, environmentId: z.string().min(1), surveyId: z.string().min(1), diff --git a/packages/jobs/vite.config.ts b/packages/jobs/vite.config.ts index 66846445a5..eacefcd25b 100644 --- a/packages/jobs/vite.config.ts +++ b/packages/jobs/vite.config.ts @@ -30,7 +30,8 @@ export default defineConfig({ plugins: [ dts({ include: ["src/**/*"], - entryRoot: ".", + exclude: ["src/**/*.test.ts"], + entryRoot: "src", outDir: "dist", }), ], diff --git a/packages/storage/src/client.ts b/packages/storage/src/client.ts index fc48827951..82b735780b 100644 --- a/packages/storage/src/client.ts +++ b/packages/storage/src/client.ts @@ -1,6 +1,5 @@ import { S3Client, type S3ClientConfig } from "@aws-sdk/client-s3"; import { logger } from "@formbricks/logger"; -import { type Result, type StorageError, StorageErrorCode, err, ok } from "../types/error"; import { S3_ACCESS_KEY, S3_BUCKET_NAME, @@ -9,6 +8,7 @@ import { S3_REGION, S3_SECRET_KEY, } from "./constants"; +import { type Result, type StorageError, StorageErrorCode, err, ok } from "./types/error"; // Cached singleton instance of S3Client let cachedS3Client: S3Client | undefined; diff --git a/packages/storage/src/index.ts b/packages/storage/src/index.ts index ae23f0e667..9ea017dad5 100644 --- a/packages/storage/src/index.ts +++ b/packages/storage/src/index.ts @@ -6,4 +6,4 @@ export { getFileStream, type FileStreamResult, } from "./service"; -export { type StorageError, StorageErrorCode } from "../types/error"; +export { type StorageError, StorageErrorCode } from "./types/error"; diff --git a/packages/storage/src/service.ts b/packages/storage/src/service.ts index ee7e0c647d..03138dc046 100644 --- a/packages/storage/src/service.ts +++ b/packages/storage/src/service.ts @@ -13,9 +13,9 @@ import { } from "@aws-sdk/s3-presigned-post"; import { getSignedUrl } from "@aws-sdk/s3-request-presigner"; import { logger } from "@formbricks/logger"; -import { type Result, type StorageError, StorageErrorCode, err, ok } from "../types/error"; import { createS3Client } from "./client"; import { S3_BUCKET_NAME } from "./constants"; +import { type Result, type StorageError, StorageErrorCode, err, ok } from "./types/error"; /** * Get a signed URL for uploading a file to S3 diff --git a/packages/storage/types/error.ts b/packages/storage/src/types/error.ts similarity index 100% rename from packages/storage/types/error.ts rename to packages/storage/src/types/error.ts diff --git a/packages/storage/tsconfig.json b/packages/storage/tsconfig.json index 3e48f73289..e135cf0335 100644 --- a/packages/storage/tsconfig.json +++ b/packages/storage/tsconfig.json @@ -8,5 +8,5 @@ }, "exclude": ["node_modules"], "extends": "@formbricks/config-typescript/js-library.json", - "include": ["src", "package.json", "types"] + "include": ["src", "package.json"] } diff --git a/packages/storage/vite.config.ts b/packages/storage/vite.config.ts index 38941072d1..15f4a0c596 100644 --- a/packages/storage/vite.config.ts +++ b/packages/storage/vite.config.ts @@ -19,6 +19,7 @@ export default defineConfig({ "@formbricks/logger", ], }, + emptyOutDir: false, }, test: { environment: "node", @@ -29,5 +30,11 @@ export default defineConfig({ include: ["src/**/*.ts"], }, }, - plugins: [dts({ rollupTypes: true }) as PluginOption], + plugins: [ + dts({ + include: ["src/**/*"], + entryRoot: ".", + outDir: "dist", + }) as PluginOption, + ], });