chore: address PR comments

This commit is contained in:
Tiago Farto
2026-04-09 11:26:12 +03:00
parent ebaa2d363c
commit 92bd9bdac7
11 changed files with 153 additions and 228 deletions

View File

@@ -3,6 +3,7 @@ import { afterEach, beforeEach, describe, expect, test, vi } from "vitest";
const mockStartJobsRuntime = vi.fn();
const mockDebug = vi.fn();
const mockError = vi.fn();
const mockWarn = vi.fn();
const mockGetJobsWorkerBootstrapConfig = vi.fn();
const TEST_TIMEOUT_MS = 15_000;
const slowTest = (name: string, fn: () => Promise<void>): void => {
@@ -22,7 +23,7 @@ vi.mock("@formbricks/logger", () => ({
debug: mockDebug,
error: mockError,
info: vi.fn(),
warn: vi.fn(),
warn: mockWarn,
},
}));
@@ -30,11 +31,13 @@ describe("instrumentation-jobs", () => {
beforeEach(() => {
vi.resetModules();
vi.clearAllMocks();
vi.useFakeTimers();
});
afterEach(async () => {
const { resetJobsWorkerRegistrationForTests } = await import("./instrumentation-jobs");
await resetJobsWorkerRegistrationForTests();
vi.useRealTimers();
});
slowTest("skips worker startup when disabled", async () => {
@@ -132,6 +135,36 @@ describe("instrumentation-jobs", () => {
await expect(registerJobsWorker()).rejects.toThrow("startup failed");
expect(mockError).toHaveBeenCalledWith({ err: startupError }, "BullMQ worker registration failed");
expect(mockWarn).toHaveBeenCalledWith(
{ retryDelayMs: 30_000 },
"BullMQ worker registration retry scheduled"
);
});
slowTest("retries worker startup after a transient failure", async () => {
const startupError = new Error("startup failed");
const recoveredRuntime = {
close: vi.fn().mockResolvedValue(undefined),
};
mockGetJobsWorkerBootstrapConfig.mockReturnValue({
enabled: true,
runtimeOptions: {
concurrency: 1,
redisUrl: "redis://localhost:6379",
workerCount: 1,
},
});
mockStartJobsRuntime.mockRejectedValueOnce(startupError).mockResolvedValueOnce(recoveredRuntime);
const { registerJobsWorker } = await import("./instrumentation-jobs");
await expect(registerJobsWorker()).rejects.toThrow("startup failed");
await vi.advanceTimersByTimeAsync(30_000);
expect(mockStartJobsRuntime).toHaveBeenCalledTimes(2);
});
slowTest("clears registration state even when reset close fails", async () => {

View File

@@ -3,17 +3,45 @@ import { startJobsRuntime } from "@formbricks/jobs";
import { logger } from "@formbricks/logger";
import { getJobsWorkerBootstrapConfig } from "@/lib/jobs/config";
const WORKER_STARTUP_RETRY_DELAY_MS = 30_000;
type TJobsRuntimeGlobal = typeof globalThis & {
formbricksJobsRuntime: JobsRuntimeHandle | undefined;
formbricksJobsRuntimeInitializing: Promise<JobsRuntimeHandle> | undefined;
formbricksJobsRuntimeRetryTimeout: ReturnType<typeof setTimeout> | undefined;
};
const globalForJobsRuntime = globalThis as TJobsRuntimeGlobal;
const clearJobsWorkerRetryTimeout = (): void => {
if (globalForJobsRuntime.formbricksJobsRuntimeRetryTimeout) {
clearTimeout(globalForJobsRuntime.formbricksJobsRuntimeRetryTimeout);
globalForJobsRuntime.formbricksJobsRuntimeRetryTimeout = undefined;
}
};
const scheduleJobsWorkerRetry = (): void => {
if (
globalForJobsRuntime.formbricksJobsRuntime ||
globalForJobsRuntime.formbricksJobsRuntimeInitializing ||
globalForJobsRuntime.formbricksJobsRuntimeRetryTimeout
) {
return;
}
globalForJobsRuntime.formbricksJobsRuntimeRetryTimeout = setTimeout(() => {
globalForJobsRuntime.formbricksJobsRuntimeRetryTimeout = undefined;
void registerJobsWorker().catch(() => undefined);
}, WORKER_STARTUP_RETRY_DELAY_MS);
logger.warn({ retryDelayMs: WORKER_STARTUP_RETRY_DELAY_MS }, "BullMQ worker registration retry scheduled");
};
export const registerJobsWorker = async (): Promise<JobsRuntimeHandle | null> => {
const jobsWorkerBootstrapConfig = getJobsWorkerBootstrapConfig();
if (!jobsWorkerBootstrapConfig.enabled || !jobsWorkerBootstrapConfig.runtimeOptions) {
clearJobsWorkerRetryTimeout();
logger.debug("BullMQ worker startup skipped");
return null;
}
@@ -29,6 +57,7 @@ export const registerJobsWorker = async (): Promise<JobsRuntimeHandle | null> =>
globalForJobsRuntime.formbricksJobsRuntimeInitializing = startJobsRuntime(
jobsWorkerBootstrapConfig.runtimeOptions
).then((runtime) => {
clearJobsWorkerRetryTimeout();
globalForJobsRuntime.formbricksJobsRuntime = runtime;
globalForJobsRuntime.formbricksJobsRuntimeInitializing = undefined;
return runtime;
@@ -39,12 +68,14 @@ export const registerJobsWorker = async (): Promise<JobsRuntimeHandle | null> =>
} catch (error) {
globalForJobsRuntime.formbricksJobsRuntimeInitializing = undefined;
logger.error({ err: error }, "BullMQ worker registration failed");
scheduleJobsWorkerRetry();
throw error;
}
};
export const resetJobsWorkerRegistrationForTests = async (): Promise<void> => {
const runtime = globalForJobsRuntime.formbricksJobsRuntime;
clearJobsWorkerRetryTimeout();
globalForJobsRuntime.formbricksJobsRuntime = undefined;
globalForJobsRuntime.formbricksJobsRuntimeInitializing = undefined;

View File

@@ -0,0 +1,46 @@
import { beforeEach, describe, expect, test, vi } from "vitest";
const mockRegisterJobsWorker = vi.fn();
vi.mock("@sentry/nextjs", () => ({
captureRequestError: vi.fn(),
}));
vi.mock("@/lib/constants", () => ({
IS_PRODUCTION: false,
PROMETHEUS_ENABLED: false,
SENTRY_DSN: undefined,
}));
vi.mock("./instrumentation-jobs", () => ({
registerJobsWorker: mockRegisterJobsWorker,
}));
describe("instrumentation register", () => {
beforeEach(() => {
vi.resetModules();
vi.clearAllMocks();
process.env.NEXT_RUNTIME = "nodejs";
delete process.env.OTEL_EXPORTER_OTLP_ENDPOINT;
});
test("does not block Next.js boot on BullMQ worker startup", async () => {
mockRegisterJobsWorker.mockReturnValue(new Promise(() => undefined));
const { register } = await import("./instrumentation");
await expect(register()).resolves.toBeUndefined();
expect(mockRegisterJobsWorker).toHaveBeenCalledTimes(1);
});
test("swallows BullMQ worker startup rejections after triggering background registration", async () => {
mockRegisterJobsWorker.mockRejectedValue(new Error("startup failed"));
const { register } = await import("./instrumentation");
await expect(register()).resolves.toBeUndefined();
await Promise.resolve();
expect(mockRegisterJobsWorker).toHaveBeenCalledTimes(1);
});
});

View File

@@ -25,7 +25,7 @@ export const register = async () => {
try {
const { registerJobsWorker } = await import("./instrumentation-jobs");
await registerJobsWorker();
void registerJobsWorker().catch(() => undefined);
} catch (error) {
logger.error({ err: error }, "BullMQ worker registration failed during Next.js instrumentation");
}

View File

@@ -4,7 +4,7 @@ import type {
TRecurringBackgroundJobSchedule,
TRunAtBackgroundJobSchedule,
} from "@/src/schedules";
import type { TResponsePipelineJobData, TTestLogJobData } from "@/src/types";
import type { TTestLogJobData } from "@/src/types";
export interface JobExecutionContext {
attempt: number;
@@ -49,18 +49,8 @@ export const toAnyBackgroundJobDefinition = <TData>(
});
export interface BackgroundJobProducer {
enqueueResponsePipeline: (data: TResponsePipelineJobData) => Promise<EnqueuedJob>;
enqueueTestLog: (data: TTestLogJobData) => Promise<EnqueuedJob>;
scheduleResponsePipelineAt: (
schedule: TRunAtBackgroundJobSchedule,
data: TResponsePipelineJobData
) => Promise<EnqueuedJob>;
scheduleTestLogAt: (schedule: TRunAtBackgroundJobSchedule, data: TTestLogJobData) => Promise<EnqueuedJob>;
upsertRecurringResponsePipelineSchedule: (
identity: TBackgroundJobScheduleIdentity,
schedule: TRecurringBackgroundJobSchedule,
data: TResponsePipelineJobData
) => Promise<UpsertedRecurringJobSchedule>;
upsertRecurringTestLogSchedule: (
identity: TBackgroundJobScheduleIdentity,
schedule: TRecurringBackgroundJobSchedule,

View File

@@ -4,12 +4,14 @@ import * as jobs from "./index";
describe("@formbricks/jobs public API", () => {
test("exports the supported public entry points without leaking registry internals", () => {
expect(jobs.enqueueTestLogJob).toBeTypeOf("function");
expect(jobs.enqueueResponsePipelineJob).toBeTypeOf("function");
expect(jobs.getBackgroundJobProducer).toBeTypeOf("function");
expect(jobs.startJobsRuntime).toBeTypeOf("function");
expect(jobs.getBackgroundJobDefinition).toBeTypeOf("function");
expect(jobs.ZResponsePipelineJobData).toBeDefined();
expect(jobs.ZTestLogJobData).toBeDefined();
expect("enqueueResponsePipelineJob" in jobs).toBe(false);
expect("scheduleResponsePipelineJobAt" in jobs).toBe(false);
expect("upsertRecurringResponsePipelineJobSchedule" in jobs).toBe(false);
expect("getJobProcessor" in jobs).toBe(false);
expect("jobProcessors" in jobs).toBe(false);
expect("processJob" in jobs).toBe(false);

View File

@@ -17,14 +17,11 @@ export type {
export { backgroundJobDefinitions, getBackgroundJobDefinition } from "./definitions";
export {
createJobsQueue,
enqueueResponsePipelineJob,
enqueueTestLogJob,
getBackgroundJobProducer,
getJobsQueue,
resetJobsQueueFactory,
scheduleResponsePipelineJobAt,
scheduleTestLogJobAt,
upsertRecurringResponsePipelineJobSchedule,
upsertRecurringTestLogJobSchedule,
} from "./queue";
export { processResponsePipelineJob } from "./processors/response-pipeline";

View File

@@ -10,14 +10,11 @@ import {
} from "./constants";
import {
createJobsQueue,
enqueueResponsePipelineJob,
enqueueTestLogJob,
getBackgroundJobProducer,
getJobsQueue,
resetJobsQueueFactory,
scheduleResponsePipelineJobAt,
scheduleTestLogJobAt,
upsertRecurringResponsePipelineJobSchedule,
upsertRecurringTestLogJobSchedule,
} from "./queue";
import { getRecurringJobSchedulerId } from "./schedules";
@@ -45,29 +42,6 @@ const mockConnection = {
status: "ready",
} as unknown as IORedis;
const responsePipelineJobData = {
environmentId: "env_123",
event: "responseCreated" as const,
response: {
contact: null,
contactAttributes: null,
createdAt: new Date("2026-04-07T10:00:00.000Z"),
data: {},
displayId: null,
endingId: null,
finished: false,
id: "cm8cmpnjj000108jfdr9dfqe6",
language: null,
meta: {},
singleUseId: null,
surveyId: "cm8cmpnjj000108jfdr9dfqe7",
tags: [],
updatedAt: new Date("2026-04-07T10:00:00.000Z"),
variables: {},
},
surveyId: "survey_123",
};
vi.mock("@formbricks/logger", () => ({
logger: {
error: mockLoggerError,
@@ -139,16 +113,6 @@ describe("@formbricks/jobs queue helpers", () => {
expect(mockQueueAdd).toHaveBeenCalledWith(JOB_NAMES.testLog, { message: "hello world" }, undefined);
});
test("enqueues the response pipeline job with the shared queue", async () => {
const mockJob = { id: "job-response-1" };
mockQueueAdd.mockResolvedValue(mockJob);
const job = await enqueueResponsePipelineJob(responsePipelineJobData);
expect(job).toBe(mockJob);
expect(mockQueueAdd).toHaveBeenCalledWith(JOB_NAMES.responsePipeline, responsePipelineJobData, undefined);
});
test("exposes an engine-neutral producer interface", async () => {
const producer = getBackgroundJobProducer();
mockQueueAdd.mockResolvedValue({
@@ -166,21 +130,12 @@ describe("@formbricks/jobs queue helpers", () => {
});
});
test("exposes response pipeline enqueues through the engine-neutral producer interface", async () => {
test("does not expose response pipeline enqueues through the engine-neutral producer interface", () => {
const producer = getBackgroundJobProducer();
mockQueueAdd.mockResolvedValue({
id: "job-response-1",
name: JOB_NAMES.responsePipeline,
queueName: JOBS_QUEUE_NAME,
});
const job = await producer.enqueueResponsePipeline(responsePipelineJobData);
expect(job).toEqual({
jobId: "job-response-1",
jobName: JOB_NAMES.responsePipeline,
queueName: JOBS_QUEUE_NAME,
});
expect("enqueueResponsePipeline" in producer).toBe(false);
expect("scheduleResponsePipelineAt" in producer).toBe(false);
expect("upsertRecurringResponsePipelineSchedule" in producer).toBe(false);
});
test("schedules a delayed job using the runAt schedule type", async () => {
@@ -198,19 +153,6 @@ describe("@formbricks/jobs queue helpers", () => {
);
});
test("schedules a delayed response pipeline job", async () => {
mockQueueAdd.mockResolvedValue({ id: "job-response-2" });
await scheduleResponsePipelineJobAt(
{ runAt: new Date("2026-04-07T10:00:05.000Z") },
responsePipelineJobData
);
expect(mockQueueAdd).toHaveBeenCalledWith(JOB_NAMES.responsePipeline, responsePipelineJobData, {
delay: 5000,
});
});
test("upserts a recurring scheduler using engine-neutral schedule types", async () => {
mockQueueUpsertJobScheduler.mockResolvedValue({
id: "job-4",
@@ -281,26 +223,6 @@ describe("@formbricks/jobs queue helpers", () => {
});
});
test("exposes response pipeline scheduling through the engine-neutral producer interface", async () => {
const producer = getBackgroundJobProducer();
mockQueueAdd.mockResolvedValue({
id: "job-6",
name: JOB_NAMES.responsePipeline,
queueName: JOBS_QUEUE_NAME,
});
const scheduledJob = await producer.scheduleResponsePipelineAt(
{ runAt: new Date("2026-04-07T10:00:05.000Z") },
responsePipelineJobData
);
expect(scheduledJob).toEqual({
jobId: "job-6",
jobName: JOB_NAMES.responsePipeline,
queueName: JOBS_QUEUE_NAME,
});
});
test("exposes test log scheduling through the engine-neutral producer interface", async () => {
const producer = getBackgroundJobProducer();
mockQueueAdd.mockResolvedValue({
@@ -321,74 +243,6 @@ describe("@formbricks/jobs queue helpers", () => {
});
});
test("upserts recurring response pipeline schedules", async () => {
mockQueueUpsertJobScheduler.mockResolvedValue({
id: "job-7",
name: JOB_NAMES.responsePipeline,
queueName: JOBS_QUEUE_NAME,
});
const scheduledJob = await upsertRecurringResponsePipelineJobSchedule(
{
scheduleId: "response-pipeline-recurring",
scope: "environment_123",
},
{
everyMs: 60_000,
kind: "every",
},
responsePipelineJobData
);
expect(mockQueueUpsertJobScheduler).toHaveBeenCalledWith(
getRecurringJobSchedulerId(JOB_NAMES.responsePipeline, {
scheduleId: "response-pipeline-recurring",
scope: "environment_123",
}),
{
endDate: undefined,
every: 60_000,
limit: undefined,
startDate: undefined,
},
{
data: responsePipelineJobData,
name: JOB_NAMES.responsePipeline,
opts: JOBS_DEFAULT_JOB_SCHEDULER_TEMPLATE_OPTIONS,
}
);
expect(scheduledJob.id).toBe("job-7");
});
test("exposes recurring response pipeline scheduling through the engine-neutral producer interface", async () => {
const producer = getBackgroundJobProducer();
mockQueueUpsertJobScheduler.mockResolvedValue({
id: "job-7b",
name: JOB_NAMES.responsePipeline,
queueName: JOBS_QUEUE_NAME,
});
const scheduledJob = await producer.upsertRecurringResponsePipelineSchedule(
{
scheduleId: "response-pipeline-recurring-producer",
scope: "environment_123",
},
{
everyMs: 60_000,
kind: "every",
},
responsePipelineJobData
);
expect(scheduledJob).toEqual({
jobId: "job-7b",
jobName: JOB_NAMES.responsePipeline,
queueName: JOBS_QUEUE_NAME,
scheduleId: "response-pipeline-recurring-producer",
scope: "environment_123",
});
});
test("rejects engine-neutral enqueues when BullMQ returns a job without an id", async () => {
const producer = getBackgroundJobProducer();
mockQueueAdd.mockResolvedValue({

View File

@@ -19,7 +19,7 @@ import {
getRecurringJobSchedulerId,
toBullMQRepeatOptions,
} from "@/src/schedules";
import { type TResponsePipelineJobData, type TTestLogJobData } from "@/src/types";
import { type TTestLogJobData } from "@/src/types";
export interface JobsQueueHandle {
connection: IORedis;
@@ -200,18 +200,6 @@ export const enqueueTestLogJob = async (data: TTestLogJobData): Promise<Job> =>
}
};
export const enqueueResponsePipelineJob = async (data: TResponsePipelineJobData): Promise<Job> => {
try {
return await enqueueBackgroundJob(JOB_NAMES.responsePipeline, data);
} catch (error) {
logger.error(
{ err: error, jobName: JOB_NAMES.responsePipeline },
"Failed to enqueue BullMQ response pipeline job"
);
throw error;
}
};
export const scheduleTestLogJobAt = async (
schedule: TRunAtBackgroundJobSchedule,
data: TTestLogJobData
@@ -227,21 +215,6 @@ export const scheduleTestLogJobAt = async (
}
};
export const scheduleResponsePipelineJobAt = async (
schedule: TRunAtBackgroundJobSchedule,
data: TResponsePipelineJobData
): Promise<Job> => {
try {
return await scheduleBackgroundJobAt(JOB_NAMES.responsePipeline, schedule, data);
} catch (error) {
logger.error(
{ err: error, jobName: JOB_NAMES.responsePipeline, schedule },
"Failed to schedule BullMQ response pipeline job"
);
throw error;
}
};
export const upsertRecurringTestLogJobSchedule = async (
identity: TBackgroundJobScheduleIdentity,
schedule: TRecurringBackgroundJobSchedule,
@@ -264,39 +237,9 @@ export const upsertRecurringTestLogJobSchedule = async (
}
};
export const upsertRecurringResponsePipelineJobSchedule = async (
identity: TBackgroundJobScheduleIdentity,
schedule: TRecurringBackgroundJobSchedule,
data: TResponsePipelineJobData
): Promise<Job> => {
try {
return await upsertRecurringBackgroundJobSchedule(JOB_NAMES.responsePipeline, identity, schedule, data);
} catch (error) {
logger.error(
{
err: error,
jobName: JOB_NAMES.responsePipeline,
schedule,
scheduleId: identity.scheduleId,
scope: identity.scope,
},
"Failed to upsert BullMQ response pipeline schedule"
);
throw error;
}
};
export const getBackgroundJobProducer = (): BackgroundJobProducer => ({
enqueueResponsePipeline: async (data) => toEnqueuedJob(await enqueueResponsePipelineJob(data)),
enqueueTestLog: async (data) => toEnqueuedJob(await enqueueTestLogJob(data)),
scheduleResponsePipelineAt: async (schedule, data) =>
toEnqueuedJob(await scheduleResponsePipelineJobAt(schedule, data)),
scheduleTestLogAt: async (schedule, data) => toEnqueuedJob(await scheduleTestLogJobAt(schedule, data)),
upsertRecurringResponsePipelineSchedule: async (identity, schedule, data) =>
toUpsertedRecurringJobSchedule(
await upsertRecurringResponsePipelineJobSchedule(identity, schedule, data),
identity
),
upsertRecurringTestLogSchedule: async (identity, schedule, data) =>
toUpsertedRecurringJobSchedule(
await upsertRecurringTestLogJobSchedule(identity, schedule, data),

View File

@@ -1,5 +1,9 @@
import { describe, expect, test } from "vitest";
import { getDelayForRunAtSchedule, getRecurringJobSchedulerId } from "./schedules";
import {
ZBackgroundJobScheduleIdentity,
getDelayForRunAtSchedule,
getRecurringJobSchedulerId,
} from "./schedules";
describe("@formbricks/jobs schedules", () => {
test("clamps small past drift to immediate execution", () => {
@@ -36,4 +40,20 @@ describe("@formbricks/jobs schedules", () => {
})
).toBe("response-pipeline.process:environment_123:daily-sync");
});
test("rejects reserved delimiters in recurring scheduler identities", () => {
expect(() =>
ZBackgroundJobScheduleIdentity.parse({
scheduleId: "daily:sync",
scope: "environment_123",
})
).toThrow(/reserved in recurring scheduler ids/);
expect(() =>
getRecurringJobSchedulerId("response-pipeline.process", {
scheduleId: "daily-sync",
scope: "environment:123",
})
).toThrow(/reserved in recurring scheduler ids/);
});
});

View File

@@ -7,6 +7,7 @@ const ZValidDate = z.date().refine((value) => !Number.isNaN(value.getTime()), {
const ZPositiveInteger = z.number().int().positive();
const MAX_RUN_AT_PAST_DRIFT_MS = 5_000;
const RESERVED_SCHEDULER_ID_DELIMITER = ":";
const ZScheduleWindow = {
endAt: ZValidDate.optional(),
@@ -17,8 +18,16 @@ const ZScheduleWindow = {
const hasValidScheduleWindow = (value: { startAt?: Date; endAt?: Date }): boolean =>
!value.startAt || !value.endAt || value.endAt.getTime() > value.startAt.getTime();
export const ZBackgroundJobScheduleId = z.string().trim().min(1);
export const ZBackgroundJobScheduleScope = z.string().trim().min(1);
const ZSchedulerKeySegment = z
.string()
.trim()
.min(1)
.refine((value) => !value.includes(RESERVED_SCHEDULER_ID_DELIMITER), {
message: `"${RESERVED_SCHEDULER_ID_DELIMITER}" is reserved in recurring scheduler ids`,
});
export const ZBackgroundJobScheduleId = ZSchedulerKeySegment;
export const ZBackgroundJobScheduleScope = ZSchedulerKeySegment;
export const ZBackgroundJobScheduleIdentity = z.object({
scheduleId: ZBackgroundJobScheduleId,
scope: ZBackgroundJobScheduleScope,