Merge branch 'feat/background_workers_v1' into chore/response-to-bullmq

This commit is contained in:
Tiago Farto
2026-04-09 11:43:30 +03:00
5 changed files with 115 additions and 12 deletions
+34 -1
View File
@@ -3,6 +3,7 @@ import { afterEach, beforeEach, describe, expect, test, vi } from "vitest";
const mockStartJobsRuntime = vi.fn();
const mockDebug = vi.fn();
const mockError = vi.fn();
const mockWarn = vi.fn();
const mockGetJobsWorkerBootstrapConfig = vi.fn();
const mockProcessResponsePipelineJob = vi.fn();
const TEST_TIMEOUT_MS = 15_000;
@@ -23,7 +24,7 @@ vi.mock("@formbricks/logger", () => ({
debug: mockDebug,
error: mockError,
info: vi.fn(),
warn: vi.fn(),
warn: mockWarn,
},
}));
@@ -35,11 +36,13 @@ describe("instrumentation-jobs", () => {
beforeEach(() => {
vi.resetModules();
vi.clearAllMocks();
vi.useFakeTimers();
});
afterEach(async () => {
const { resetJobsWorkerRegistrationForTests } = await import("./instrumentation-jobs");
await resetJobsWorkerRegistrationForTests();
vi.useRealTimers();
});
slowTest("skips worker startup when disabled", async () => {
@@ -165,6 +168,36 @@ describe("instrumentation-jobs", () => {
await expect(registerJobsWorker()).rejects.toThrow("startup failed");
expect(mockError).toHaveBeenCalledWith({ err: startupError }, "BullMQ worker registration failed");
expect(mockWarn).toHaveBeenCalledWith(
{ retryDelayMs: 30_000 },
"BullMQ worker registration retry scheduled"
);
});
slowTest("retries worker startup after a transient failure", async () => {
const startupError = new Error("startup failed");
const recoveredRuntime = {
close: vi.fn().mockResolvedValue(undefined),
};
mockGetJobsWorkerBootstrapConfig.mockReturnValue({
enabled: true,
runtimeOptions: {
concurrency: 1,
redisUrl: "redis://localhost:6379",
workerCount: 1,
},
});
mockStartJobsRuntime.mockRejectedValueOnce(startupError).mockResolvedValueOnce(recoveredRuntime);
const { registerJobsWorker } = await import("./instrumentation-jobs");
await expect(registerJobsWorker()).rejects.toThrow("startup failed");
await vi.advanceTimersByTimeAsync(30_000);
expect(mockStartJobsRuntime).toHaveBeenCalledTimes(2);
});
slowTest("clears registration state even when reset close fails", async () => {
+31
View File
@@ -3,18 +3,46 @@ import { logger } from "@formbricks/logger";
import { getJobsWorkerBootstrapConfig } from "@/lib/jobs/config";
import { processResponsePipelineJob } from "@/modules/response-pipeline/lib/process-response-pipeline-job";
const WORKER_STARTUP_RETRY_DELAY_MS = 30_000;
type TJobsRuntimeGlobal = typeof globalThis & {
formbricksJobsRuntime: JobsRuntimeHandle | undefined;
formbricksJobsRuntimeInitializing: Promise<JobsRuntimeHandle> | undefined;
formbricksJobsRuntimeRetryTimeout: ReturnType<typeof setTimeout> | undefined;
};
const globalForJobsRuntime = globalThis as TJobsRuntimeGlobal;
const RESPONSE_PIPELINE_JOB_NAME = "response-pipeline.process";
const clearJobsWorkerRetryTimeout = (): void => {
if (globalForJobsRuntime.formbricksJobsRuntimeRetryTimeout) {
clearTimeout(globalForJobsRuntime.formbricksJobsRuntimeRetryTimeout);
globalForJobsRuntime.formbricksJobsRuntimeRetryTimeout = undefined;
}
};
const scheduleJobsWorkerRetry = (): void => {
if (
globalForJobsRuntime.formbricksJobsRuntime ||
globalForJobsRuntime.formbricksJobsRuntimeInitializing ||
globalForJobsRuntime.formbricksJobsRuntimeRetryTimeout
) {
return;
}
globalForJobsRuntime.formbricksJobsRuntimeRetryTimeout = setTimeout(() => {
globalForJobsRuntime.formbricksJobsRuntimeRetryTimeout = undefined;
void registerJobsWorker().catch(() => undefined);
}, WORKER_STARTUP_RETRY_DELAY_MS);
logger.warn({ retryDelayMs: WORKER_STARTUP_RETRY_DELAY_MS }, "BullMQ worker registration retry scheduled");
};
export const registerJobsWorker = async (): Promise<JobsRuntimeHandle | null> => {
const jobsWorkerBootstrapConfig = getJobsWorkerBootstrapConfig();
if (!jobsWorkerBootstrapConfig.enabled || !jobsWorkerBootstrapConfig.runtimeOptions) {
clearJobsWorkerRetryTimeout();
logger.debug("BullMQ worker startup skipped");
return null;
}
@@ -35,6 +63,7 @@ export const registerJobsWorker = async (): Promise<JobsRuntimeHandle | null> =>
},
},
}).then((runtime) => {
clearJobsWorkerRetryTimeout();
globalForJobsRuntime.formbricksJobsRuntime = runtime;
globalForJobsRuntime.formbricksJobsRuntimeInitializing = undefined;
return runtime;
@@ -45,12 +74,14 @@ export const registerJobsWorker = async (): Promise<JobsRuntimeHandle | null> =>
} catch (error) {
globalForJobsRuntime.formbricksJobsRuntimeInitializing = undefined;
logger.error({ err: error }, "BullMQ worker registration failed");
scheduleJobsWorkerRetry();
throw error;
}
};
export const resetJobsWorkerRegistrationForTests = async (): Promise<void> => {
const runtime = globalForJobsRuntime.formbricksJobsRuntime;
clearJobsWorkerRetryTimeout();
globalForJobsRuntime.formbricksJobsRuntime = undefined;
globalForJobsRuntime.formbricksJobsRuntimeInitializing = undefined;
+46
View File
@@ -0,0 +1,46 @@
import { beforeEach, describe, expect, test, vi } from "vitest";
const mockRegisterJobsWorker = vi.fn();
vi.mock("@sentry/nextjs", () => ({
captureRequestError: vi.fn(),
}));
vi.mock("@/lib/constants", () => ({
IS_PRODUCTION: false,
PROMETHEUS_ENABLED: false,
SENTRY_DSN: undefined,
}));
vi.mock("./instrumentation-jobs", () => ({
registerJobsWorker: mockRegisterJobsWorker,
}));
describe("instrumentation register", () => {
beforeEach(() => {
vi.resetModules();
vi.clearAllMocks();
process.env.NEXT_RUNTIME = "nodejs";
delete process.env.OTEL_EXPORTER_OTLP_ENDPOINT;
});
test("does not block Next.js boot on BullMQ worker startup", async () => {
mockRegisterJobsWorker.mockReturnValue(new Promise(() => undefined));
const { register } = await import("./instrumentation");
await expect(register()).resolves.toBeUndefined();
expect(mockRegisterJobsWorker).toHaveBeenCalledTimes(1);
});
test("swallows BullMQ worker startup rejections after triggering background registration", async () => {
mockRegisterJobsWorker.mockRejectedValue(new Error("startup failed"));
const { register } = await import("./instrumentation");
await expect(register()).resolves.toBeUndefined();
await Promise.resolve();
expect(mockRegisterJobsWorker).toHaveBeenCalledTimes(1);
});
});
+1 -1
View File
@@ -25,7 +25,7 @@ export const register = async () => {
try {
const { registerJobsWorker } = await import("./instrumentation-jobs");
await registerJobsWorker();
void registerJobsWorker().catch(() => undefined);
} catch (error) {
logger.error({ err: error }, "BullMQ worker registration failed during Next.js instrumentation");
}
+3 -10
View File
@@ -426,20 +426,13 @@ describe("@formbricks/jobs queue helpers", () => {
expect(message).toBe("Failed to close BullMQ producer queue during reset");
});
test("keeps clearing global state when producer connection shutdown fails during reset", async () => {
test("clears memoized state after reset so a new queue can be created", async () => {
await getJobsQueue();
mockCloseRedisConnection.mockRejectedValueOnce(new Error("connection close failed"));
await expect(resetJobsQueueFactory()).resolves.toBeUndefined();
await resetJobsQueueFactory();
const nextQueueResult = await getJobsQueue();
expect(nextQueueResult.connection).toBe(mockConnection);
expect(nextQueueResult.queue).toBeDefined();
expect(mockLoggerError).toHaveBeenCalledTimes(1);
const secondLoggerCalls = mockLoggerError.mock.calls as [{ err: Error }, string][];
const [context, message] = secondLoggerCalls[0];
expect(context.err).toBeInstanceOf(Error);
expect(message).toBe("Failed to close BullMQ producer connection during reset");
expect(nextQueueResult.queue).toBeDefined();
expect(Queue).toHaveBeenCalledTimes(2);
});
});