chore: addressed PR concerns

This commit is contained in:
Tiago Farto
2026-04-07 14:18:52 +03:00
parent f42a8822a9
commit 29a08151aa
24 changed files with 220 additions and 40 deletions

View File

@@ -1,14 +1,10 @@
name: Build & Cache Web App
on:
workflow_dispatch:
inputs:
e2e_testing_mode:
description: "Set E2E Testing Mode"
required: false
default: "0"
inputs:
e2e_testing_mode:
description: "Set E2E Testing Mode"
required: false
default: "0"
turbo_token:
description: "Turborepo token"
required: false

View File

@@ -4,6 +4,10 @@ const mockStartJobsRuntime = vi.fn();
const mockDebug = vi.fn();
const mockError = vi.fn();
const mockGetJobsWorkerBootstrapConfig = vi.fn();
const TEST_TIMEOUT_MS = 15_000;
const slowTest = (name: string, fn: () => Promise<void>): void => {
test(name, fn, TEST_TIMEOUT_MS);
};
vi.mock("@formbricks/jobs", () => ({
startJobsRuntime: mockStartJobsRuntime,
@@ -33,7 +37,7 @@ describe("instrumentation-jobs", () => {
await resetJobsWorkerRegistrationForTests();
});
test("skips worker startup when disabled", async () => {
slowTest("skips worker startup when disabled", async () => {
mockGetJobsWorkerBootstrapConfig.mockReturnValue({
enabled: false,
runtimeOptions: null,
@@ -47,7 +51,7 @@ describe("instrumentation-jobs", () => {
expect(mockDebug).toHaveBeenCalledWith("BullMQ worker startup skipped");
});
test("starts the worker only once", async () => {
slowTest("starts the worker only once", async () => {
const mockRuntime = {
close: vi.fn().mockResolvedValue(undefined),
};
@@ -77,7 +81,7 @@ describe("instrumentation-jobs", () => {
});
});
test("logs and rethrows startup failures", async () => {
slowTest("logs and rethrows startup failures", async () => {
const startupError = new Error("startup failed");
mockGetJobsWorkerBootstrapConfig.mockReturnValue({

View File

@@ -3,14 +3,13 @@ import { startJobsRuntime } from "@formbricks/jobs";
import { logger } from "@formbricks/logger";
import { getJobsWorkerBootstrapConfig } from "@/lib/jobs/config";
const globalForJobsRuntime = globalThis as {
formbricksJobsRuntime: JobsRuntimeHandle | undefined;
formbricksJobsRuntimeInitializing: Promise<JobsRuntimeHandle> | undefined;
} as unknown as {
type TJobsRuntimeGlobal = typeof globalThis & {
formbricksJobsRuntime: JobsRuntimeHandle | undefined;
formbricksJobsRuntimeInitializing: Promise<JobsRuntimeHandle> | undefined;
};
const globalForJobsRuntime = globalThis as TJobsRuntimeGlobal;
export const registerJobsWorker = async (): Promise<JobsRuntimeHandle | null> => {
const jobsWorkerBootstrapConfig = getJobsWorkerBootstrapConfig();

View File

@@ -1,5 +1,6 @@
import * as Sentry from "@sentry/nextjs";
import { type Instrumentation } from "next";
import { logger } from "@formbricks/logger";
import { isExpectedError } from "@formbricks/types/errors";
import { IS_PRODUCTION, PROMETHEUS_ENABLED, SENTRY_DSN } from "@/lib/constants";
@@ -22,8 +23,12 @@ export const register = async () => {
await import("./instrumentation-node");
}
const { registerJobsWorker } = await import("./instrumentation-jobs");
await registerJobsWorker();
try {
const { registerJobsWorker } = await import("./instrumentation-jobs");
await registerJobsWorker();
} catch (error) {
logger.error({ err: error }, "BullMQ worker registration failed during Next.js instrumentation");
}
}
// Sentry init loads after OTEL to avoid TracerProvider conflicts
// Sentry tracing is disabled (tracesSampleRate: 0) -- SigNoz handles distributed tracing

33
apps/web/lib/env.test.ts Normal file
View File

@@ -0,0 +1,33 @@
import { afterEach, beforeEach, describe, expect, test, vi } from "vitest";
const ORIGINAL_ENV = process.env;
const setTestEnv = (overrides: Record<string, string | undefined> = {}) => {
process.env = {
...ORIGINAL_ENV,
NODE_ENV: "test",
DATABASE_URL: "https://example.com/db",
ENCRYPTION_KEY: "12345678901234567890123456789012",
...overrides,
};
};
describe("env", () => {
beforeEach(() => {
vi.resetModules();
});
afterEach(() => {
process.env = ORIGINAL_ENV;
});
test("allows ambient DEBUG values from external tooling", async () => {
setTestEnv({
DEBUG: "pnpm:*",
});
const { env } = await import("./env");
expect(env.DEBUG).toBe("pnpm:*");
});
});

View File

@@ -15,7 +15,9 @@ export const env = createEnv({
BREVO_API_KEY: z.string().optional(),
BREVO_LIST_ID: z.string().optional(),
DATABASE_URL: z.url(),
DEBUG: z.enum(["1", "0"]).optional(),
// DEBUG is a common ambient env var in CI/tooling, so we accept arbitrary strings here
// and only treat "1" as enabling Formbricks-specific debug behavior downstream.
DEBUG: z.string().optional(),
AUTH_DEFAULT_TEAM_ID: z.string().optional(),
AUTH_SKIP_INVITE_FOR_SSO: z.enum(["1", "0"]).optional(),
BULLMQ_WORKER_CONCURRENCY: z.coerce.number().int().min(1).optional(),

View File

@@ -69,4 +69,23 @@ describe("jobs runtime config", () => {
},
});
});
test("respects explicit worker disable flag outside tests", async () => {
vi.doMock("@/lib/env", () => ({
env: {
BULLMQ_WORKER_CONCURRENCY: 6,
BULLMQ_WORKER_COUNT: 3,
BULLMQ_WORKER_ENABLED: "0",
NODE_ENV: "production",
REDIS_URL: "redis://cache.internal:6379",
},
}));
const { getJobsWorkerBootstrapConfig } = await import("./config");
expect(getJobsWorkerBootstrapConfig()).toEqual({
enabled: false,
runtimeOptions: null,
});
});
});

View File

@@ -78,6 +78,12 @@ describe("@formbricks/jobs connection helpers", () => {
expect(() => getRedisUrlFromEnv()).toThrow("REDIS_URL is required for BullMQ");
});
test("throws when REDIS_URL is malformed", () => {
process.env.REDIS_URL = "not-a-url";
expect(() => getRedisUrlFromEnv()).toThrow("REDIS_URL must be a valid URL for BullMQ");
});
test("quits an active Redis connection", async () => {
await closeRedisConnection({
quit: mockQuit.mockResolvedValue(undefined),
@@ -88,4 +94,15 @@ describe("@formbricks/jobs connection helpers", () => {
expect(mockQuit).toHaveBeenCalledTimes(1);
expect(mockDisconnect).not.toHaveBeenCalled();
});
test("disconnects a non-ready Redis connection", async () => {
await closeRedisConnection({
quit: mockQuit,
disconnect: mockDisconnect,
status: "connecting",
} as unknown as IORedis);
expect(mockQuit).not.toHaveBeenCalled();
expect(mockDisconnect).toHaveBeenCalledTimes(1);
});
});

View File

@@ -55,6 +55,10 @@ export const getRedisUrlFromEnv = (): string => {
throw new Error("REDIS_URL is required for BullMQ");
}
if (!URL.canParse(redisUrl)) {
throw new Error("REDIS_URL must be a valid URL for BullMQ");
}
return redisUrl;
};
@@ -63,6 +67,11 @@ export const closeRedisConnection = async (connection: IORedis): Promise<void> =
return;
}
if (connection.status !== "ready") {
connection.disconnect();
return;
}
try {
await connection.quit();
} catch {

View File

@@ -18,13 +18,22 @@ let queueEventsConnection: IORedis | null = null;
let isRedisAvailable = false;
async function isQueueReady(url: string): Promise<boolean> {
let probe: Awaited<ReturnType<typeof startJobsRuntime>> | null = null;
try {
const probe = await startJobsRuntime({ redisUrl: url });
await probe.close();
probe = await startJobsRuntime({ redisUrl: url });
return true;
} catch (error) {
logger.info({ error }, "BullMQ integration tests skipped because Redis is not available");
return false;
} finally {
if (probe) {
try {
await probe.close();
} catch (error) {
logger.warn({ err: error }, "Failed to close BullMQ runtime probe cleanly");
}
}
}
}

View File

@@ -47,6 +47,49 @@ describe("@formbricks/jobs processor registry", () => {
);
});
test("fails fast for the unimplemented response pipeline processor", async () => {
await expect(
processJob({
attemptsMade: 0,
data: {
environmentId: "env_123",
event: "responseCreated",
response: {
contact: null,
contactAttributes: null,
createdAt: new Date("2026-04-07T10:00:00.000Z"),
data: {},
displayId: null,
endingId: null,
finished: false,
id: "cm8cmpnjj000108jfdr9dfqe6",
language: null,
meta: {},
singleUseId: null,
surveyId: "cm8cmpnjj000108jfdr9dfqe7",
tags: [],
updatedAt: new Date("2026-04-07T10:00:00.000Z"),
variables: {},
},
surveyId: "survey_123",
},
id: "job-3",
name: JOB_NAMES.responsePipeline,
queueName: "background-jobs",
} as never)
).rejects.toThrow("Unimplemented response pipeline processor");
expect(mockError).toHaveBeenCalledWith(
expect.objectContaining({
environmentId: "env_123",
jobId: "job-3",
jobName: JOB_NAMES.responsePipeline,
surveyId: "survey_123",
}),
"Unimplemented response pipeline processor"
);
});
test("throws for unknown jobs", async () => {
await expect(
processJob({

View File

@@ -3,7 +3,7 @@ import type { JobHandler } from "@/src/contracts";
import type { TResponsePipelineJobData } from "@/src/types";
export const processResponsePipelineJob: JobHandler<TResponsePipelineJobData> = (data, context) => {
logger.debug(
logger.error(
{
attempt: context.attempt,
environmentId: data.environmentId,
@@ -13,8 +13,10 @@ export const processResponsePipelineJob: JobHandler<TResponsePipelineJobData> =
jobName: context.jobName,
queueName: context.queueName,
},
"Processed placeholder response pipeline job"
"Unimplemented response pipeline processor"
);
return Promise.resolve();
throw new Error(
`Unimplemented response pipeline processor for job ${context.jobId} (${data.environmentId}/${data.surveyId})`
);
};

View File

@@ -21,12 +21,14 @@ import { getRecurringJobSchedulerId } from "./schedules";
const {
mockCloseRedisConnection,
mockLoggerError,
mockQueueAdd,
mockQueueClose,
mockQueueUpsertJobScheduler,
mockQueueWaitUntilReady,
} = vi.hoisted(() => ({
mockCloseRedisConnection: vi.fn(),
mockLoggerError: vi.fn(),
mockQueueAdd: vi.fn(),
mockQueueClose: vi.fn(),
mockQueueUpsertJobScheduler: vi.fn(),
@@ -42,7 +44,7 @@ const mockConnection = {
vi.mock("@formbricks/logger", () => ({
logger: {
error: vi.fn(),
error: mockLoggerError,
info: vi.fn(),
warn: vi.fn(),
debug: vi.fn(),
@@ -221,4 +223,18 @@ describe("@formbricks/jobs queue helpers", () => {
expect(mockQueueClose).toHaveBeenCalledTimes(1);
expect(mockCloseRedisConnection).toHaveBeenCalledWith(mockConnection);
});
test("keeps cleaning up when queue shutdown fails during reset", async () => {
await getJobsQueue();
mockQueueClose.mockRejectedValueOnce(new Error("queue close failed"));
await expect(resetJobsQueueFactory()).resolves.toBeUndefined();
expect(mockQueueClose).toHaveBeenCalledTimes(1);
expect(mockCloseRedisConnection).toHaveBeenCalledWith(mockConnection);
expect(mockLoggerError).toHaveBeenCalledTimes(1);
const [context, message] = mockLoggerError.mock.calls[0] as [{ err: Error }, string];
expect(context.err).toBeInstanceOf(Error);
expect(message).toBe("Failed to close BullMQ producer queue during reset");
});
});

View File

@@ -293,12 +293,20 @@ export const getBackgroundJobProducer = (): BackgroundJobProducer => ({
});
export const resetJobsQueueFactory = async (): Promise<void> => {
if (queueSingleton) {
await queueSingleton.close();
try {
if (queueSingleton) {
await queueSingleton.close();
}
} catch (error) {
logger.error({ err: error }, "Failed to close BullMQ producer queue during reset");
}
if (connectionSingleton) {
await closeRedisConnection(connectionSingleton);
try {
if (connectionSingleton) {
await closeRedisConnection(connectionSingleton);
}
} catch (error) {
logger.error({ err: error }, "Failed to close BullMQ producer connection during reset");
}
queueSingleton = undefined;

View File

@@ -120,11 +120,15 @@ export const startJobsRuntime = async ({
};
const handleSigterm = (): void => {
void closeRuntime();
closeRuntime().catch((error: unknown) => {
logger.error({ err: error }, "BullMQ shutdown failed in closeRuntime after SIGTERM");
});
};
const handleSigint = (): void => {
void closeRuntime();
closeRuntime().catch((error: unknown) => {
logger.error({ err: error }, "BullMQ shutdown failed in closeRuntime after SIGINT");
});
};
try {

View File

@@ -14,6 +14,9 @@ const ZScheduleWindow = {
startAt: ZValidDate.optional(),
} as const;
const hasValidScheduleWindow = (value: { startAt?: Date; endAt?: Date }): boolean =>
!value.startAt || !value.endAt || value.endAt.getTime() > value.startAt.getTime();
export const ZBackgroundJobScheduleId = z.string().trim().min(1);
export const ZBackgroundJobScheduleScope = z.string().trim().min(1);
export const ZBackgroundJobScheduleIdentity = z.object({
@@ -35,7 +38,7 @@ export const ZRecurringEveryBackgroundJobSchedule = z
everyMs: ZPositiveInteger,
kind: z.literal("every"),
})
.refine((value) => !value.startAt || !value.endAt || value.endAt.getTime() > value.startAt.getTime(), {
.refine(hasValidScheduleWindow, {
message: "endAt must be after startAt",
path: ["endAt"],
});
@@ -48,7 +51,7 @@ export const ZRecurringCronBackgroundJobSchedule = z
kind: z.literal("cron"),
timeZone: z.string().trim().min(1).optional(),
})
.refine((value) => !value.startAt || !value.endAt || value.endAt.getTime() > value.startAt.getTime(), {
.refine(hasValidScheduleWindow, {
message: "endAt must be after startAt",
path: ["endAt"],
});

View File

@@ -1,5 +1,4 @@
import { z } from "zod";
import { ZWebhook } from "@formbricks/database/zod/webhooks";
import { ZResponse } from "@formbricks/types/responses";
export const ZTestLogJobData = z.object({
@@ -10,8 +9,12 @@ export const ZTestLogJobData = z.object({
export type TTestLogJobData = z.infer<typeof ZTestLogJobData>;
export const ZResponsePipelineEvent = z.enum(["responseFinished", "responseCreated", "responseUpdated"]);
export type TResponsePipelineEvent = z.infer<typeof ZResponsePipelineEvent>;
export const ZResponsePipelineJobData = z.object({
event: ZWebhook.shape.triggers.element,
event: ZResponsePipelineEvent,
response: ZResponse,
environmentId: z.string().min(1),
surveyId: z.string().min(1),

View File

@@ -30,7 +30,8 @@ export default defineConfig({
plugins: [
dts({
include: ["src/**/*"],
entryRoot: ".",
exclude: ["src/**/*.test.ts"],
entryRoot: "src",
outDir: "dist",
}),
],

View File

@@ -1,6 +1,5 @@
import { S3Client, type S3ClientConfig } from "@aws-sdk/client-s3";
import { logger } from "@formbricks/logger";
import { type Result, type StorageError, StorageErrorCode, err, ok } from "../types/error";
import {
S3_ACCESS_KEY,
S3_BUCKET_NAME,
@@ -9,6 +8,7 @@ import {
S3_REGION,
S3_SECRET_KEY,
} from "./constants";
import { type Result, type StorageError, StorageErrorCode, err, ok } from "./types/error";
// Cached singleton instance of S3Client
let cachedS3Client: S3Client | undefined;

View File

@@ -6,4 +6,4 @@ export {
getFileStream,
type FileStreamResult,
} from "./service";
export { type StorageError, StorageErrorCode } from "../types/error";
export { type StorageError, StorageErrorCode } from "./types/error";

View File

@@ -13,9 +13,9 @@ import {
} from "@aws-sdk/s3-presigned-post";
import { getSignedUrl } from "@aws-sdk/s3-request-presigner";
import { logger } from "@formbricks/logger";
import { type Result, type StorageError, StorageErrorCode, err, ok } from "../types/error";
import { createS3Client } from "./client";
import { S3_BUCKET_NAME } from "./constants";
import { type Result, type StorageError, StorageErrorCode, err, ok } from "./types/error";
/**
* Get a signed URL for uploading a file to S3

View File

@@ -8,5 +8,5 @@
},
"exclude": ["node_modules"],
"extends": "@formbricks/config-typescript/js-library.json",
"include": ["src", "package.json", "types"]
"include": ["src", "package.json"]
}

View File

@@ -19,6 +19,7 @@ export default defineConfig({
"@formbricks/logger",
],
},
emptyOutDir: false,
},
test: {
environment: "node",
@@ -29,5 +30,11 @@ export default defineConfig({
include: ["src/**/*.ts"],
},
},
plugins: [dts({ rollupTypes: true }) as PluginOption],
plugins: [
dts({
include: ["src/**/*"],
entryRoot: ".",
outDir: "dist",
}) as PluginOption,
],
});