Compare commits

...

4 Commits

Author SHA1 Message Date
Tiago Farto
f190cfc246 chore: align river implementation 2026-03-25 16:06:55 +00:00
Tiago Farto
b5b5da4fdc Merge branch 'main' into feat/workflows-service 2026-03-25 13:58:13 +00:00
Tiago Farto
c44e96e4ed chore: date format fix 2026-03-25 11:37:14 +00:00
Tiago Farto
90b26935a9 feat: River workflows POC 2026-03-25 10:53:46 +00:00
33 changed files with 2146 additions and 1919 deletions

3
.gitignore vendored
View File

@@ -66,3 +66,6 @@ i18n.cache
stats.html
# next-agents-md
.next-docs/
# Golang
.cache

View File

@@ -3,33 +3,40 @@ import { z } from "zod";
import { prisma } from "@formbricks/database";
import { logger } from "@formbricks/logger";
import { DatabaseError } from "@formbricks/types/errors";
import { deleteSurveyLifecycleJobs } from "@/lib/river/survey-lifecycle";
import { validateInputs } from "@/lib/utils/validate";
export const deleteSurvey = async (surveyId: string) => {
validateInputs([surveyId, z.cuid2()]);
try {
const deletedSurvey = await prisma.survey.delete({
where: {
id: surveyId,
},
include: {
segment: true,
triggers: {
include: {
actionClass: true,
const deletedSurvey = await prisma.$transaction(async (tx) => {
await deleteSurveyLifecycleJobs({ tx, surveyId });
const removedSurvey = await tx.survey.delete({
where: {
id: surveyId,
},
include: {
segment: true,
triggers: {
include: {
actionClass: true,
},
},
},
},
});
if (deletedSurvey.type === "app" && deletedSurvey.segment?.isPrivate) {
await prisma.segment.delete({
where: {
id: deletedSurvey.segment.id,
},
});
}
if (removedSurvey.type === "app" && removedSurvey.segment?.isPrivate) {
await tx.segment.delete({
where: {
id: removedSurvey.segment.id,
},
});
}
return removedSurvey;
});
return deletedSurvey;
} catch (error) {

View File

@@ -0,0 +1,10 @@
import "server-only";
export const RIVER_SCHEMA = "river";
export const RIVER_SURVEY_LIFECYCLE_QUEUE = "survey_lifecycle";
export const RIVER_SURVEY_START_KIND = "survey.start";
export const RIVER_SURVEY_END_KIND = "survey.end";
export const RIVER_SURVEY_LIFECYCLE_MAX_ATTEMPTS = 3;
export const RIVER_INSERT_NOTIFICATION_CHANNEL = "river_insert";
export const RIVER_PENDING_JOB_STATES = ["available", "scheduled", "retryable"] as const;

View File

@@ -0,0 +1,502 @@
import { Prisma } from "@prisma/client";
import type { PrismaClient } from "@prisma/client";
import { randomUUID } from "crypto";
import { Socket } from "net";
import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, test, vi } from "vitest";
import { logger } from "@formbricks/logger";
import {
RIVER_INSERT_NOTIFICATION_CHANNEL,
RIVER_PENDING_JOB_STATES,
RIVER_SURVEY_END_KIND,
RIVER_SURVEY_LIFECYCLE_MAX_ATTEMPTS,
RIVER_SURVEY_LIFECYCLE_QUEUE,
RIVER_SURVEY_START_KIND,
} from "./constants";
import { deleteSurveyLifecycleJobs, enqueueSurveyLifecycleJobs } from "./survey-lifecycle";
vi.mock("server-only", () => ({}));
vi.mock("@formbricks/logger", () => ({
logger: {
error: vi.fn(),
},
}));
const createMockTx = () =>
({
$executeRaw: vi.fn(),
}) as unknown as Prisma.TransactionClient;
const getQueryValues = (callIndex: number, tx: Prisma.TransactionClient) => {
const query = vi.mocked(tx.$executeRaw).mock.calls[callIndex][0] as Prisma.Sql;
return query.values;
};
describe("enqueueSurveyLifecycleJobs", () => {
beforeEach(() => {
vi.mocked(logger.error).mockReset();
});
test("enqueues a start job when startsAt is set on create", async () => {
const tx = createMockTx();
const startsAt = new Date("2026-04-01T12:00:00.000Z");
await enqueueSurveyLifecycleJobs({
tx,
now: new Date("2026-03-31T12:00:00.000Z"),
survey: {
id: "survey_1",
environmentId: "env_1",
startsAt,
endsAt: null,
},
});
expect(tx.$executeRaw).toHaveBeenCalledTimes(2);
expect(getQueryValues(0, tx)).toEqual([
JSON.stringify({
surveyId: "survey_1",
environmentId: "env_1",
scheduledFor: startsAt.toISOString(),
}),
RIVER_SURVEY_START_KIND,
RIVER_SURVEY_LIFECYCLE_MAX_ATTEMPTS,
RIVER_SURVEY_LIFECYCLE_QUEUE,
startsAt,
]);
expect(getQueryValues(1, tx)).toEqual([
`river.${RIVER_INSERT_NOTIFICATION_CHANNEL}`,
JSON.stringify({ queue: RIVER_SURVEY_LIFECYCLE_QUEUE }),
]);
});
test("enqueues an end job when endsAt is set on create", async () => {
const tx = createMockTx();
const endsAt = new Date("2026-04-02T12:00:00.000Z");
await enqueueSurveyLifecycleJobs({
tx,
now: new Date("2026-03-31T12:00:00.000Z"),
survey: {
id: "survey_1",
environmentId: "env_1",
startsAt: null,
endsAt,
},
});
expect(tx.$executeRaw).toHaveBeenCalledTimes(2);
expect(getQueryValues(0, tx)).toEqual([
JSON.stringify({
surveyId: "survey_1",
environmentId: "env_1",
scheduledFor: endsAt.toISOString(),
}),
RIVER_SURVEY_END_KIND,
RIVER_SURVEY_LIFECYCLE_MAX_ATTEMPTS,
RIVER_SURVEY_LIFECYCLE_QUEUE,
endsAt,
]);
});
test("enqueues both lifecycle jobs when both dates are set on create", async () => {
const tx = createMockTx();
const startsAt = new Date("2026-04-01T12:00:00.000Z");
const endsAt = new Date("2026-04-02T12:00:00.000Z");
await enqueueSurveyLifecycleJobs({
tx,
now: new Date("2026-03-31T12:00:00.000Z"),
survey: {
id: "survey_1",
environmentId: "env_1",
startsAt,
endsAt,
},
});
expect(tx.$executeRaw).toHaveBeenCalledTimes(3);
expect(getQueryValues(0, tx)[1]).toBe(RIVER_SURVEY_START_KIND);
expect(getQueryValues(1, tx)[1]).toBe(RIVER_SURVEY_END_KIND);
});
test("does nothing when neither lifecycle date is set", async () => {
const tx = createMockTx();
await enqueueSurveyLifecycleJobs({
tx,
survey: {
id: "survey_1",
environmentId: "env_1",
startsAt: null,
endsAt: null,
},
});
expect(tx.$executeRaw).not.toHaveBeenCalled();
});
test("enqueues a lifecycle job when a date transitions from null to a value", async () => {
const tx = createMockTx();
const startsAt = new Date("2026-04-01T12:00:00.000Z");
await enqueueSurveyLifecycleJobs({
tx,
now: new Date("2026-03-31T12:00:00.000Z"),
survey: {
id: "survey_1",
environmentId: "env_1",
startsAt,
endsAt: null,
},
previousSurvey: {
startsAt: null,
endsAt: null,
},
});
expect(tx.$executeRaw).toHaveBeenCalledTimes(2);
});
test("does not enqueue when a lifecycle date changes after already being set", async () => {
const tx = createMockTx();
await enqueueSurveyLifecycleJobs({
tx,
survey: {
id: "survey_1",
environmentId: "env_1",
startsAt: new Date("2026-04-02T12:00:00.000Z"),
endsAt: null,
},
previousSurvey: {
startsAt: new Date("2026-04-01T12:00:00.000Z"),
endsAt: null,
},
});
expect(tx.$executeRaw).not.toHaveBeenCalled();
});
test("does not enqueue when a lifecycle date is cleared", async () => {
const tx = createMockTx();
await enqueueSurveyLifecycleJobs({
tx,
survey: {
id: "survey_1",
environmentId: "env_1",
startsAt: null,
endsAt: null,
},
previousSurvey: {
startsAt: new Date("2026-04-01T12:00:00.000Z"),
endsAt: null,
},
});
expect(tx.$executeRaw).not.toHaveBeenCalled();
});
test("logs and rethrows SQL errors", async () => {
const tx = createMockTx();
const queryError = new Error("insert failed");
vi.mocked(tx.$executeRaw).mockRejectedValueOnce(queryError);
await expect(
enqueueSurveyLifecycleJobs({
tx,
survey: {
id: "survey_1",
environmentId: "env_1",
startsAt: new Date("2026-04-01T12:00:00.000Z"),
endsAt: null,
},
})
).rejects.toThrow(queryError);
expect(logger.error).toHaveBeenCalledWith(
{ error: queryError, surveyId: "survey_1" },
"Failed to enqueue survey lifecycle jobs"
);
});
});
describe("deleteSurveyLifecycleJobs", () => {
beforeEach(() => {
vi.mocked(logger.error).mockReset();
});
test("deletes pending lifecycle jobs for the survey", async () => {
const tx = createMockTx();
await deleteSurveyLifecycleJobs({
tx,
surveyId: "survey_1",
});
expect(tx.$executeRaw).toHaveBeenCalledTimes(1);
expect(getQueryValues(0, tx)).toEqual([
RIVER_SURVEY_START_KIND,
RIVER_SURVEY_END_KIND,
"survey_1",
...RIVER_PENDING_JOB_STATES,
]);
});
test("logs and rethrows delete failures", async () => {
const tx = createMockTx();
const queryError = new Error("delete failed");
vi.mocked(tx.$executeRaw).mockRejectedValueOnce(queryError);
await expect(
deleteSurveyLifecycleJobs({
tx,
surveyId: "survey_1",
})
).rejects.toThrow(queryError);
expect(logger.error).toHaveBeenCalledWith(
{ error: queryError, surveyId: "survey_1" },
"Failed to delete pending survey lifecycle jobs"
);
});
});
const canReachPostgres = async (databaseURL?: string): Promise<boolean> => {
if (!databaseURL) {
return false;
}
try {
const parsedURL = new URL(databaseURL);
const port = Number(parsedURL.port || "5432");
await new Promise<void>((resolve, reject) => {
const socket = new Socket();
socket.setTimeout(500);
socket.once("connect", () => {
socket.destroy();
resolve();
});
socket.once("timeout", () => {
socket.destroy();
reject(new Error("timeout"));
});
socket.once("error", (error) => {
socket.destroy();
reject(error);
});
socket.connect(port, parsedURL.hostname);
});
return true;
} catch {
return false;
}
};
const describeIfDatabase = (await canReachPostgres(process.env.DATABASE_URL)) ? describe : describe.skip;
describeIfDatabase("survey lifecycle integration", () => {
let integrationPrisma: PrismaClient;
let schema: string;
const quoteIdentifier = (identifier: string) => `"${identifier}"`;
beforeAll(() => {
return vi
.importActual<typeof import("@prisma/client")>("@prisma/client")
.then(({ PrismaClient: ActualPrismaClient }) => {
integrationPrisma = new ActualPrismaClient({
datasources: {
db: {
url: process.env.DATABASE_URL,
},
},
});
});
});
beforeEach(async () => {
schema = `river_test_${randomUUID().replace(/-/g, "_")}`;
await integrationPrisma.$executeRaw(Prisma.raw(`CREATE SCHEMA ${quoteIdentifier(schema)}`));
await integrationPrisma.$executeRaw(
Prisma.raw(`
CREATE TYPE ${quoteIdentifier(schema)}.${quoteIdentifier("river_job_state")} AS ENUM (
'available',
'scheduled',
'retryable',
'completed'
)
`)
);
await integrationPrisma.$executeRaw(
Prisma.raw(`
CREATE TABLE ${quoteIdentifier(schema)}.${quoteIdentifier("river_job")} (
id BIGSERIAL PRIMARY KEY,
state ${quoteIdentifier(schema)}.${quoteIdentifier("river_job_state")} NOT NULL DEFAULT 'available',
args JSONB NOT NULL,
kind TEXT NOT NULL,
max_attempts SMALLINT NOT NULL,
queue TEXT NOT NULL,
scheduled_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
)
`)
);
});
afterEach(async () => {
await integrationPrisma.$executeRaw(
Prisma.raw(`DROP SCHEMA IF EXISTS ${quoteIdentifier(schema)} CASCADE`)
);
});
afterAll(async () => {
await integrationPrisma.$disconnect();
});
test("persists scheduled and immediate lifecycle jobs with the expected payload", async () => {
const surveyId = "survey_1";
const environmentId = "env_1";
const startsAt = new Date("2026-05-01T12:00:00.000Z");
const endsAt = new Date("2026-03-01T12:00:00.000Z");
const beforeInsert = new Date();
await integrationPrisma.$transaction(async (tx) => {
await enqueueSurveyLifecycleJobs({
tx,
schema,
now: new Date("2026-04-01T12:00:00.000Z"),
survey: {
id: surveyId,
environmentId,
startsAt,
endsAt: null,
},
});
await enqueueSurveyLifecycleJobs({
tx,
schema,
now: new Date("2026-04-01T12:00:00.000Z"),
survey: {
id: surveyId,
environmentId,
startsAt,
endsAt,
},
previousSurvey: {
startsAt,
endsAt: null,
},
});
});
const afterInsert = new Date();
const jobs = await integrationPrisma.$queryRaw<
Array<{
kind: string;
queue: string;
args: Record<string, string>;
scheduled_at: Date;
max_attempts: number;
}>
>(
Prisma.raw(
`SELECT kind, queue, args, scheduled_at, max_attempts
FROM ${quoteIdentifier(schema)}.${quoteIdentifier("river_job")}
ORDER BY kind ASC`
)
);
expect(jobs).toHaveLength(2);
expect(jobs).toEqual(
expect.arrayContaining([
expect.objectContaining({
kind: RIVER_SURVEY_END_KIND,
queue: RIVER_SURVEY_LIFECYCLE_QUEUE,
args: {
surveyId,
environmentId,
scheduledFor: endsAt.toISOString(),
},
max_attempts: RIVER_SURVEY_LIFECYCLE_MAX_ATTEMPTS,
}),
expect.objectContaining({
kind: RIVER_SURVEY_START_KIND,
queue: RIVER_SURVEY_LIFECYCLE_QUEUE,
args: {
surveyId,
environmentId,
scheduledFor: startsAt.toISOString(),
},
scheduled_at: startsAt,
max_attempts: RIVER_SURVEY_LIFECYCLE_MAX_ATTEMPTS,
}),
])
);
const immediateJob = jobs.find((job) => job.kind === RIVER_SURVEY_END_KIND);
expect(immediateJob?.scheduled_at.getTime()).toBeGreaterThanOrEqual(beforeInsert.getTime() - 1000);
expect(immediateJob?.scheduled_at.getTime()).toBeLessThanOrEqual(afterInsert.getTime() + 1000);
});
test("removes only pending lifecycle jobs for the target survey", async () => {
const surveyId = "survey_1";
await integrationPrisma.$executeRaw(
Prisma.raw(`
INSERT INTO ${quoteIdentifier(schema)}.${quoteIdentifier("river_job")}
(state, args, kind, max_attempts, queue)
VALUES
('available', '{"surveyId":"survey_1","environmentId":"env_1","scheduledFor":"2026-04-01T12:00:00.000Z"}', '${RIVER_SURVEY_START_KIND}', 3, '${RIVER_SURVEY_LIFECYCLE_QUEUE}'),
('completed', '{"surveyId":"survey_1","environmentId":"env_1","scheduledFor":"2026-04-02T12:00:00.000Z"}', '${RIVER_SURVEY_END_KIND}', 3, '${RIVER_SURVEY_LIFECYCLE_QUEUE}'),
('retryable', '{"surveyId":"survey_2","environmentId":"env_1","scheduledFor":"2026-04-03T12:00:00.000Z"}', '${RIVER_SURVEY_START_KIND}', 3, '${RIVER_SURVEY_LIFECYCLE_QUEUE}')
`)
);
await integrationPrisma.$transaction(async (tx) => {
await deleteSurveyLifecycleJobs({
tx,
surveyId,
schema,
});
});
const remainingJobs = await integrationPrisma.$queryRaw<
Array<{ state: string; kind: string; args: { surveyId: string } }>
>(
Prisma.raw(
`SELECT state, kind, args
FROM ${quoteIdentifier(schema)}.${quoteIdentifier("river_job")}
ORDER BY state ASC, kind ASC`
)
);
expect(remainingJobs).toEqual([
{
state: "completed",
kind: RIVER_SURVEY_END_KIND,
args: {
surveyId: "survey_1",
environmentId: "env_1",
scheduledFor: "2026-04-02T12:00:00.000Z",
},
},
{
state: "retryable",
kind: RIVER_SURVEY_START_KIND,
args: {
surveyId: "survey_2",
environmentId: "env_1",
scheduledFor: "2026-04-03T12:00:00.000Z",
},
},
]);
});
});

View File

@@ -0,0 +1,181 @@
import "server-only";
import { Prisma } from "@prisma/client";
import { logger } from "@formbricks/logger";
import { TSurvey } from "@formbricks/types/surveys/types";
import {
RIVER_INSERT_NOTIFICATION_CHANNEL,
RIVER_PENDING_JOB_STATES,
RIVER_SCHEMA,
RIVER_SURVEY_END_KIND,
RIVER_SURVEY_LIFECYCLE_MAX_ATTEMPTS,
RIVER_SURVEY_LIFECYCLE_QUEUE,
RIVER_SURVEY_START_KIND,
} from "./constants";
export type SurveyLifecycleJobKind = typeof RIVER_SURVEY_START_KIND | typeof RIVER_SURVEY_END_KIND;
export interface SurveyLifecycleJobArgs {
surveyId: string;
environmentId: string;
scheduledFor: string;
}
export interface SurveyLifecycleSurvey {
id: TSurvey["id"];
environmentId: TSurvey["environmentId"];
startsAt?: TSurvey["startsAt"];
endsAt?: TSurvey["endsAt"];
}
interface EnqueueSurveyLifecycleJobsOptions {
tx: Prisma.TransactionClient;
survey: SurveyLifecycleSurvey;
previousSurvey?: Pick<SurveyLifecycleSurvey, "startsAt" | "endsAt"> | null;
now?: Date;
schema?: string;
}
interface DeleteSurveyLifecycleJobsOptions {
tx: Prisma.TransactionClient;
surveyId: string;
schema?: string;
kinds?: SurveyLifecycleJobKind[];
}
const identifierPattern = /^[A-Za-z_][A-Za-z0-9_]*$/;
const quoteIdentifier = (identifier: string): string => {
if (!identifierPattern.test(identifier)) {
throw new Error(`Invalid SQL identifier: ${identifier}`);
}
return `"${identifier}"`;
};
const getQualifiedRiverJobTable = (schema: string): Prisma.Sql =>
Prisma.raw(`${quoteIdentifier(schema)}.${quoteIdentifier("river_job")}`);
const getQualifiedInsertNotificationChannel = (schema: string): string => {
if (!identifierPattern.test(schema)) {
throw new Error(`Invalid SQL identifier: ${schema}`);
}
return `${schema}.${RIVER_INSERT_NOTIFICATION_CHANNEL}`;
};
const shouldEnqueueTransition = (previousValue?: Date | null, nextValue?: Date | null): nextValue is Date =>
previousValue == null && nextValue != null;
const buildJobArgs = (survey: SurveyLifecycleSurvey, scheduledFor: Date): SurveyLifecycleJobArgs => ({
surveyId: survey.id,
environmentId: survey.environmentId,
scheduledFor: scheduledFor.toISOString(),
});
const enqueueLifecycleJob = async (
tx: Prisma.TransactionClient,
{
kind,
survey,
scheduledFor,
schema,
now,
}: {
kind: SurveyLifecycleJobKind;
survey: SurveyLifecycleSurvey;
scheduledFor: Date;
schema: string;
now: Date;
}
): Promise<void> => {
const args = JSON.stringify(buildJobArgs(survey, scheduledFor));
const riverJobTable = getQualifiedRiverJobTable(schema);
if (scheduledFor.getTime() > now.getTime()) {
await tx.$executeRaw(
Prisma.sql`
INSERT INTO ${riverJobTable} (args, kind, max_attempts, queue, scheduled_at)
VALUES (
${args}::jsonb,
${kind},
${RIVER_SURVEY_LIFECYCLE_MAX_ATTEMPTS},
${RIVER_SURVEY_LIFECYCLE_QUEUE},
${scheduledFor}
)
`
);
} else {
await tx.$executeRaw(
Prisma.sql`
INSERT INTO ${riverJobTable} (args, kind, max_attempts, queue)
VALUES (
${args}::jsonb,
${kind},
${RIVER_SURVEY_LIFECYCLE_MAX_ATTEMPTS},
${RIVER_SURVEY_LIFECYCLE_QUEUE}
)
`
);
}
};
const notifyLifecycleQueue = async (tx: Prisma.TransactionClient, schema: string): Promise<void> => {
const payload = JSON.stringify({ queue: RIVER_SURVEY_LIFECYCLE_QUEUE });
await tx.$executeRaw(
Prisma.sql`SELECT pg_notify(${getQualifiedInsertNotificationChannel(schema)}, ${payload})`
);
};
export const enqueueSurveyLifecycleJobs = async ({
tx,
survey,
previousSurvey,
now = new Date(),
schema = RIVER_SCHEMA,
}: EnqueueSurveyLifecycleJobsOptions): Promise<void> => {
const pendingJobs: Array<{ kind: SurveyLifecycleJobKind; scheduledFor: Date }> = [];
if (shouldEnqueueTransition(previousSurvey?.startsAt ?? null, survey.startsAt ?? null)) {
pendingJobs.push({ kind: RIVER_SURVEY_START_KIND, scheduledFor: survey.startsAt as Date });
}
if (shouldEnqueueTransition(previousSurvey?.endsAt ?? null, survey.endsAt ?? null)) {
pendingJobs.push({ kind: RIVER_SURVEY_END_KIND, scheduledFor: survey.endsAt as Date });
}
if (pendingJobs.length === 0) {
return;
}
try {
for (const job of pendingJobs) {
await enqueueLifecycleJob(tx, { ...job, survey, schema, now });
}
await notifyLifecycleQueue(tx, schema);
} catch (error) {
logger.error({ error, surveyId: survey.id }, "Failed to enqueue survey lifecycle jobs");
throw error;
}
};
export const deleteSurveyLifecycleJobs = async ({
tx,
surveyId,
schema = RIVER_SCHEMA,
kinds = [RIVER_SURVEY_START_KIND, RIVER_SURVEY_END_KIND],
}: DeleteSurveyLifecycleJobsOptions): Promise<void> => {
try {
await tx.$executeRaw(
Prisma.sql`
DELETE FROM ${getQualifiedRiverJobTable(schema)}
WHERE kind IN (${Prisma.join(kinds)})
AND args->>'surveyId' = ${surveyId}
AND state IN (${Prisma.join(RIVER_PENDING_JOB_STATES)})
`
);
} catch (error) {
logger.error({ error, surveyId }, "Failed to delete pending survey lifecycle jobs");
throw error;
}
};

View File

@@ -193,6 +193,8 @@ const mockWelcomeCard: TSurveyWelcomeCard = {
const baseSurveyProperties = {
id: mockId,
name: "Mock Survey",
startsAt: null,
endsAt: null,
autoClose: 10,
delay: 0,
autoComplete: 7,

View File

@@ -1,11 +1,17 @@
import { prisma } from "@/lib/__mocks__/database";
import { prisma } from "../__mocks__/database";
import { ActionClass, Prisma, Survey } from "@prisma/client";
import { beforeEach, describe, expect, test, vi } from "vitest";
import { testInputValidation } from "vitestSetup";
import { PrismaErrorType } from "@formbricks/database/types/error";
import { TSurveyFollowUp } from "@formbricks/database/types/survey-follow-up";
import { logger } from "@formbricks/logger";
import { TActionClass } from "@formbricks/types/action-classes";
import { DatabaseError, InvalidInputError, ResourceNotFoundError } from "@formbricks/types/errors";
import {
DatabaseError,
InvalidInputError,
ResourceNotFoundError,
ValidationError,
} from "@formbricks/types/errors";
import { TSegment } from "@formbricks/types/segment";
import { TSurvey, TSurveyCreateInput, TSurveyQuestionTypeEnum } from "@formbricks/types/surveys/types";
import { getActionClasses } from "@/lib/actionClass/service";
@@ -13,6 +19,7 @@ import {
getOrganizationByEnvironmentId,
subscribeOrganizationMembersToSurveyResponses,
} from "@/lib/organization/service";
import { enqueueSurveyLifecycleJobs } from "@/lib/river/survey-lifecycle";
import { evaluateLogic } from "@/lib/surveyLogic/utils";
import {
mockActionClass,
@@ -36,6 +43,8 @@ import {
updateSurveyInternal,
} from "./service";
vi.mock("server-only", () => ({}));
// Mock organization service
vi.mock("@/lib/organization/service", () => ({
getOrganizationByEnvironmentId: vi.fn().mockResolvedValue({
@@ -49,8 +58,21 @@ vi.mock("@/lib/actionClass/service", () => ({
getActionClasses: vi.fn(),
}));
vi.mock("@/lib/river/survey-lifecycle", () => ({
enqueueSurveyLifecycleJobs: vi.fn(),
}));
vi.mock("@formbricks/logger", () => ({
logger: {
error: vi.fn(),
},
}));
beforeEach(() => {
prisma.survey.count.mockResolvedValue(1);
prisma.$transaction.mockImplementation(async (callback: any) => callback(prisma));
vi.mocked(enqueueSurveyLifecycleJobs).mockReset();
vi.mocked(logger.error).mockReset();
});
describe("evaluateLogic with mockSurveyWithLogic", () => {
@@ -309,6 +331,35 @@ describe("Tests for updateSurvey", () => {
expect(updatedSurvey).toEqual(mockTransformedSurveyOutput);
});
test("enqueues lifecycle jobs when a date is assigned for the first time", async () => {
const startsAt = new Date("2026-04-01T12:00:00.000Z");
const currentSurvey = { ...mockSurveyOutput, startsAt: null, endsAt: null };
const persistedSurvey = { ...mockSurveyOutput, startsAt, endsAt: null };
prisma.survey.findUnique.mockResolvedValueOnce(currentSurvey);
prisma.survey.update.mockResolvedValueOnce(persistedSurvey);
await updateSurvey({
...updateSurveyInput,
startsAt,
endsAt: null,
});
expect(enqueueSurveyLifecycleJobs).toHaveBeenCalledWith({
tx: prisma,
survey: {
id: persistedSurvey.id,
environmentId: persistedSurvey.environmentId,
startsAt,
endsAt: null,
},
previousSurvey: {
startsAt: null,
endsAt: null,
},
});
});
// Note: Language handling tests (for languages.length > 0 fix) are covered in
// apps/web/modules/survey/editor/lib/survey.test.ts where we have better control
// over the test mocks. The key fix ensures languages.length > 0 (not > 1) is used.
@@ -341,6 +392,31 @@ describe("Tests for updateSurvey", () => {
prisma.survey.update.mockRejectedValue(new Error(mockErrorMessage));
await expect(updateSurvey(updateSurveyInput)).rejects.toThrow(Error);
});
test("logs and rethrows lifecycle enqueue failures", async () => {
const enqueueError = new Error("enqueue failed");
prisma.survey.findUnique.mockResolvedValueOnce({
...mockSurveyOutput,
startsAt: null,
endsAt: null,
});
prisma.survey.update.mockResolvedValueOnce({
...mockSurveyOutput,
startsAt: new Date("2026-04-01T12:00:00.000Z"),
endsAt: null,
});
vi.mocked(enqueueSurveyLifecycleJobs).mockRejectedValueOnce(enqueueError);
await expect(
updateSurvey({
...updateSurveyInput,
startsAt: new Date("2026-04-01T12:00:00.000Z"),
})
).rejects.toThrow(enqueueError);
expect(logger.error).toHaveBeenCalledWith(enqueueError, "Error updating survey");
});
});
});
@@ -647,6 +723,36 @@ describe("Tests for createSurvey", () => {
expect(subscribeOrganizationMembersToSurveyResponses).toHaveBeenCalled();
});
test("passes start and end dates to the lifecycle scheduler", async () => {
const startsAt = new Date("2026-04-02T08:00:00.000Z");
const endsAt = new Date("2026-04-03T08:00:00.000Z");
vi.mocked(getOrganizationByEnvironmentId).mockResolvedValueOnce(mockOrganizationOutput);
prisma.survey.create.mockResolvedValueOnce({
...mockSurveyOutput,
startsAt,
endsAt,
type: "link",
});
await createSurvey(mockEnvironmentId, {
...mockCreateSurveyInput,
type: "link",
startsAt,
endsAt,
});
expect(enqueueSurveyLifecycleJobs).toHaveBeenCalledWith({
tx: prisma,
survey: {
id: mockSurveyOutput.id,
environmentId: mockSurveyOutput.environmentId,
startsAt,
endsAt,
},
});
});
test("creates a private segment for app surveys", async () => {
vi.mocked(getOrganizationByEnvironmentId).mockResolvedValueOnce(mockOrganizationOutput);
prisma.survey.create.mockResolvedValueOnce({
@@ -663,6 +769,10 @@ describe("Tests for createSurvey", () => {
createdAt: new Date(),
updatedAt: new Date(),
} as unknown as TSegment);
prisma.survey.update.mockResolvedValueOnce({
...mockSurveyOutput,
type: "app",
});
await createSurvey(mockEnvironmentId, {
...mockCreateSurveyInput,
@@ -725,6 +835,20 @@ describe("Tests for createSurvey", () => {
describe("Sad Path", () => {
testInputValidation(createSurvey, "123#", mockCreateSurveyInput);
test("rejects surveys whose start date is not before the end date", async () => {
const startsAt = new Date("2026-04-03T08:00:00.000Z");
const endsAt = new Date("2026-04-02T08:00:00.000Z");
await expect(
createSurvey(mockEnvironmentId, {
...mockCreateSurveyInput,
type: "link",
startsAt,
endsAt,
})
).rejects.toThrow(ValidationError);
});
test("throws ResourceNotFoundError if organization not found", async () => {
vi.mocked(getOrganizationByEnvironmentId).mockResolvedValueOnce(null);
await expect(createSurvey(mockEnvironmentId, mockCreateSurveyInput)).rejects.toThrow(

View File

@@ -11,6 +11,7 @@ import {
getOrganizationByEnvironmentId,
subscribeOrganizationMembersToSurveyResponses,
} from "@/lib/organization/service";
import { enqueueSurveyLifecycleJobs } from "@/lib/river/survey-lifecycle";
import { TriggerUpdate } from "@/modules/survey/editor/types/survey-trigger";
import { getActionClasses } from "../actionClass/service";
import { ITEMS_PER_PAGE } from "../constants";
@@ -32,6 +33,8 @@ export const selectSurvey = {
environmentId: true,
createdBy: true,
status: true,
startsAt: true,
endsAt: true,
welcomeCard: true,
questions: true,
blocks: true,
@@ -300,8 +303,6 @@ export const updateSurveyInternal = async (
try {
const surveyId = updatedSurvey.id;
let data: any = {};
const actionClasses = await getActionClasses(updatedSurvey.environmentId);
const currentSurvey = await getSurvey(surveyId);
@@ -324,132 +325,139 @@ export const updateSurveyInternal = async (
}
}
if (languages) {
// Process languages update logic here
// Extract currentLanguageIds and updatedLanguageIds
const currentLanguageIds = currentSurvey.languages
? currentSurvey.languages.map((l) => l.language.id)
: [];
const updatedLanguageIds =
languages.length > 0 ? updatedSurvey.languages.map((l) => l.language.id) : [];
const enabledLanguageIds = languages.map((language) => {
if (language.enabled) return language.language.id;
});
// Determine languages to add and remove
const languagesToAdd = updatedLanguageIds.filter((id) => !currentLanguageIds.includes(id));
const languagesToRemove = currentLanguageIds.filter((id) => !updatedLanguageIds.includes(id));
const defaultLanguageId = updatedSurvey.languages.find((l) => l.default)?.language.id;
// Prepare data for Prisma update
data.languages = {};
// Update existing languages for default value changes
data.languages.updateMany = currentSurvey.languages.map((surveyLanguage) => ({
where: { languageId: surveyLanguage.language.id },
data: {
default: surveyLanguage.language.id === defaultLanguageId,
enabled: enabledLanguageIds.includes(surveyLanguage.language.id),
},
}));
// Add new languages
if (languagesToAdd.length > 0) {
data.languages.create = languagesToAdd.map((languageId) => ({
languageId: languageId,
default: languageId === defaultLanguageId,
enabled: enabledLanguageIds.includes(languageId),
}));
}
// Remove languages no longer associated with the survey
if (languagesToRemove.length > 0) {
data.languages.deleteMany = languagesToRemove.map((languageId) => ({
languageId: languageId,
enabled: enabledLanguageIds.includes(languageId),
}));
}
const organization = await getOrganizationByEnvironmentId(environmentId);
if (!organization) {
throw new ResourceNotFoundError("Organization", null);
}
if (triggers) {
data.triggers = handleTriggerUpdates(triggers, currentSurvey.triggers, actionClasses);
}
const prismaSurvey = await prisma.$transaction(async (tx) => {
let data: any = {};
// if the survey body has type other than "app" but has a private segment, we delete that segment, and if it has a public segment, we disconnect from to the survey
if (segment) {
if (type === "app") {
// parse the segment filters:
const parsedFilters = ZSegmentFilters.safeParse(segment.filters);
if (!skipValidation && !parsedFilters.success) {
throw new InvalidInputError("Invalid user segment filters");
if (languages) {
// Process languages update logic here
// Extract currentLanguageIds and updatedLanguageIds
const currentLanguageIds = currentSurvey.languages
? currentSurvey.languages.map((l) => l.language.id)
: [];
const updatedLanguageIds =
languages.length > 0 ? updatedSurvey.languages.map((l) => l.language.id) : [];
const enabledLanguageIds = languages.map((language) => {
if (language.enabled) return language.language.id;
});
// Determine languages to add and remove
const languagesToAdd = updatedLanguageIds.filter((id) => !currentLanguageIds.includes(id));
const languagesToRemove = currentLanguageIds.filter((id) => !updatedLanguageIds.includes(id));
const defaultLanguageId = updatedSurvey.languages.find((l) => l.default)?.language.id;
// Prepare data for Prisma update
data.languages = {};
// Update existing languages for default value changes
data.languages.updateMany = currentSurvey.languages.map((surveyLanguage) => ({
where: { languageId: surveyLanguage.language.id },
data: {
default: surveyLanguage.language.id === defaultLanguageId,
enabled: enabledLanguageIds.includes(surveyLanguage.language.id),
},
}));
// Add new languages
if (languagesToAdd.length > 0) {
data.languages.create = languagesToAdd.map((languageId) => ({
languageId: languageId,
default: languageId === defaultLanguageId,
enabled: enabledLanguageIds.includes(languageId),
}));
}
try {
// update the segment:
let updatedInput: Prisma.SegmentUpdateInput = {
...segment,
surveys: undefined,
};
// Remove languages no longer associated with the survey
if (languagesToRemove.length > 0) {
data.languages.deleteMany = languagesToRemove.map((languageId) => ({
languageId: languageId,
enabled: enabledLanguageIds.includes(languageId),
}));
}
}
if (segment.surveys) {
updatedInput = {
...segment,
surveys: {
connect: segment.surveys.map((surveyId) => ({ id: surveyId })),
},
};
if (triggers) {
data.triggers = handleTriggerUpdates(triggers, currentSurvey.triggers, actionClasses);
}
// if the survey body has type other than "app" but has a private segment, we delete that segment, and if it has a public segment, we disconnect from to the survey
if (segment) {
if (type === "app") {
// parse the segment filters:
const parsedFilters = ZSegmentFilters.safeParse(segment.filters);
if (!skipValidation && !parsedFilters.success) {
throw new InvalidInputError("Invalid user segment filters");
}
await prisma.segment.update({
where: { id: segment.id },
data: updatedInput,
select: {
surveys: { select: { id: true } },
environmentId: true,
id: true,
},
});
} catch (error) {
logger.error(error, "Error updating survey");
throw new Error("Error updating survey");
}
} else {
if (segment.isPrivate) {
// disconnect the private segment first and then delete:
await prisma.segment.update({
where: { id: segment.id },
data: {
surveys: {
disconnect: {
id: surveyId,
try {
// update the segment:
let updatedInput: Prisma.SegmentUpdateInput = {
...segment,
surveys: undefined,
};
if (segment.surveys) {
updatedInput = {
...segment,
surveys: {
connect: segment.surveys.map((segmentSurveyId) => ({ id: segmentSurveyId })),
},
};
}
await tx.segment.update({
where: { id: segment.id },
data: updatedInput,
select: {
surveys: { select: { id: true } },
environmentId: true,
id: true,
},
});
} catch (error) {
logger.error(error, "Error updating survey");
throw new Error("Error updating survey");
}
} else {
if (segment.isPrivate) {
// disconnect the private segment first and then delete:
await tx.segment.update({
where: { id: segment.id },
data: {
surveys: {
disconnect: {
id: surveyId,
},
},
},
},
});
});
// delete the private segment:
await prisma.segment.delete({
where: {
id: segment.id,
},
});
} else {
await prisma.survey.update({
where: {
id: surveyId,
},
data: {
segment: {
disconnect: true,
// delete the private segment:
await tx.segment.delete({
where: {
id: segment.id,
},
},
});
});
} else {
await tx.survey.update({
where: {
id: surveyId,
},
data: {
segment: {
disconnect: true,
},
},
});
}
}
}
} else if (type === "app") {
if (!currentSurvey.segment) {
await prisma.survey.update({
} else if (type === "app" && !currentSurvey.segment) {
await tx.survey.update({
where: {
id: surveyId,
},
@@ -477,102 +485,98 @@ export const updateSurveyInternal = async (
},
});
}
}
if (followUps) {
// Separate follow-ups into categories based on deletion flag
const deletedFollowUps = followUps.filter((followUp) => followUp.deleted);
const nonDeletedFollowUps = followUps.filter((followUp) => !followUp.deleted);
if (followUps) {
// Separate follow-ups into categories based on deletion flag
const deletedFollowUps = followUps.filter((followUp) => followUp.deleted);
const nonDeletedFollowUps = followUps.filter((followUp) => !followUp.deleted);
// Get set of existing follow-up IDs from currentSurvey
const existingFollowUpIds = new Set(currentSurvey.followUps.map((f) => f.id));
// Get set of existing follow-up IDs from currentSurvey
const existingFollowUpIds = new Set(currentSurvey.followUps.map((f) => f.id));
// Separate non-deleted follow-ups into new and existing
const existingFollowUps = nonDeletedFollowUps.filter((followUp) =>
existingFollowUpIds.has(followUp.id)
);
const newFollowUps = nonDeletedFollowUps.filter((followUp) => !existingFollowUpIds.has(followUp.id));
// Separate non-deleted follow-ups into new and existing
const existingFollowUps = nonDeletedFollowUps.filter((followUp) =>
existingFollowUpIds.has(followUp.id)
);
const newFollowUps = nonDeletedFollowUps.filter((followUp) => !existingFollowUpIds.has(followUp.id));
data.followUps = {
// Update existing follow-ups
updateMany: existingFollowUps.map((followUp) => ({
where: {
id: followUp.id,
},
data: {
name: followUp.name,
trigger: followUp.trigger,
action: followUp.action,
},
})),
// Create new follow-ups
createMany:
newFollowUps.length > 0
? {
data: newFollowUps.map((followUp) => ({
data.followUps = {
// Update existing follow-ups
updateMany: existingFollowUps.map((followUp) => ({
where: {
id: followUp.id,
},
data: {
name: followUp.name,
trigger: followUp.trigger,
action: followUp.action,
},
})),
// Create new follow-ups
createMany:
newFollowUps.length > 0
? {
data: newFollowUps.map((followUp) => ({
id: followUp.id,
name: followUp.name,
trigger: followUp.trigger,
action: followUp.action,
})),
}
: undefined,
// Delete follow-ups marked as deleted, regardless of whether they exist in DB
deleteMany:
deletedFollowUps.length > 0
? deletedFollowUps.map((followUp) => ({
id: followUp.id,
name: followUp.name,
trigger: followUp.trigger,
action: followUp.action,
})),
}
: undefined,
// Delete follow-ups marked as deleted, regardless of whether they exist in DB
deleteMany:
deletedFollowUps.length > 0
? deletedFollowUps.map((followUp) => ({
id: followUp.id,
}))
: undefined,
};
}
}))
: undefined,
};
}
data.questions = questions.map((question) => {
const { isDraft, ...rest } = question;
return rest;
data.questions = questions.map((question) => {
const { isDraft, ...rest } = question;
return rest;
});
// Strip isDraft from elements before saving
if (updatedSurvey.blocks && updatedSurvey.blocks.length > 0) {
data.blocks = stripIsDraftFromBlocks(updatedSurvey.blocks);
}
surveyData.updatedAt = new Date();
data = {
...surveyData,
...data,
type,
};
delete data.createdBy;
const prismaSurvey = await tx.survey.update({
where: { id: surveyId },
data,
select: selectSurvey,
});
await enqueueSurveyLifecycleJobs({
tx,
survey: {
id: prismaSurvey.id,
environmentId: prismaSurvey.environmentId,
startsAt: prismaSurvey.startsAt,
endsAt: prismaSurvey.endsAt,
},
previousSurvey: {
startsAt: currentSurvey.startsAt ?? null,
endsAt: currentSurvey.endsAt ?? null,
},
});
return prismaSurvey;
});
// Strip isDraft from elements before saving
if (updatedSurvey.blocks && updatedSurvey.blocks.length > 0) {
data.blocks = stripIsDraftFromBlocks(updatedSurvey.blocks);
}
const organization = await getOrganizationByEnvironmentId(environmentId);
if (!organization) {
throw new ResourceNotFoundError("Organization", null);
}
surveyData.updatedAt = new Date();
data = {
...surveyData,
...data,
type,
};
delete data.createdBy;
const prismaSurvey = await prisma.survey.update({
where: { id: surveyId },
data,
select: selectSurvey,
});
let surveySegment: TSegment | null = null;
if (prismaSurvey.segment) {
surveySegment = {
...prismaSurvey.segment,
surveys: prismaSurvey.segment.surveys.map((survey) => survey.id),
};
}
const modifiedSurvey: TSurvey = {
...prismaSurvey, // Properties from prismaSurvey
displayPercentage: Number(prismaSurvey.displayPercentage) || null,
segment: surveySegment,
customHeadScriptsMode: prismaSurvey.customHeadScriptsMode,
};
return modifiedSurvey;
return transformPrismaSurvey<TSurvey>(prismaSurvey);
} catch (error) {
logger.error(error, "Error updating survey");
if (error instanceof Prisma.PrismaClientKnownRequestError) {
@@ -651,64 +655,69 @@ export const createSurvey = async (
data.blocks = validateMediaAndPrepareBlocks(data.blocks);
}
const survey = await prisma.survey.create({
data: {
...data,
environment: {
connect: {
id: parsedEnvironmentId,
},
},
},
select: selectSurvey,
});
// if the survey created is an "app" survey, we also create a private segment for it.
if (survey.type === "app") {
const newSegment = await prisma.segment.create({
const survey = await prisma.$transaction(async (tx) => {
const createdSurvey = await tx.survey.create({
data: {
title: survey.id,
filters: [],
isPrivate: true,
...data,
environment: {
connect: {
id: parsedEnvironmentId,
},
},
},
select: selectSurvey,
});
await prisma.survey.update({
where: {
id: survey.id,
},
data: {
segment: {
connect: {
id: newSegment.id,
let createdOrUpdatedSurvey = createdSurvey;
// if the survey created is an "app" survey, we also create a private segment for it.
if (createdSurvey.type === "app") {
const newSegment = await tx.segment.create({
data: {
title: createdSurvey.id,
filters: [],
isPrivate: true,
environment: {
connect: {
id: parsedEnvironmentId,
},
},
},
});
createdOrUpdatedSurvey = await tx.survey.update({
where: {
id: createdSurvey.id,
},
data: {
segment: {
connect: {
id: newSegment.id,
},
},
},
select: selectSurvey,
});
}
await enqueueSurveyLifecycleJobs({
tx,
survey: {
id: createdOrUpdatedSurvey.id,
environmentId: createdOrUpdatedSurvey.environmentId,
startsAt: createdOrUpdatedSurvey.startsAt,
endsAt: createdOrUpdatedSurvey.endsAt,
},
});
}
// TODO: Fix this, this happens because the survey type "web" is no longer in the zod types but its required in the schema for migration
// @ts-expect-error
const transformedSurvey: TSurvey = {
...survey,
...(survey.segment && {
segment: {
...survey.segment,
surveys: survey.segment.surveys.map((survey) => survey.id),
},
}),
};
return createdOrUpdatedSurvey;
});
if (createdBy) {
await subscribeOrganizationMembersToSurveyResponses(survey.id, createdBy, organization.id);
}
return transformedSurvey;
return transformPrismaSurvey<TSurvey>(survey);
} catch (error) {
if (error instanceof Prisma.PrismaClientKnownRequestError) {
logger.error(error, "Error creating survey");

View File

@@ -31,6 +31,8 @@ export const ZSurveyInput = ZSurveyWithoutQuestionType.pick({
environmentId: true,
questions: true,
blocks: true,
startsAt: true,
endsAt: true,
endings: true,
hiddenFields: true,
variables: true,
@@ -59,6 +61,8 @@ export const ZSurveyInput = ZSurveyWithoutQuestionType.pick({
displayLimit: true,
autoClose: true,
autoComplete: true,
startsAt: true,
endsAt: true,
surveyClosedMessage: true,
styling: true,
projectOverwrites: true,

View File

@@ -1,284 +1,43 @@
import { ActionClass, Prisma } from "@prisma/client";
import "@testing-library/jest-dom/vitest";
import { beforeEach, describe, expect, test, vi } from "vitest";
import { prisma } from "@formbricks/database";
import { logger } from "@formbricks/logger";
import { DatabaseError, InvalidInputError, ResourceNotFoundError } from "@formbricks/types/errors";
import { TSurveyCreateInput } from "@formbricks/types/surveys/types";
import { DatabaseError } from "@formbricks/types/errors";
import { TSurvey, TSurveyCreateInput } from "@formbricks/types/surveys/types";
import {
getOrganizationByEnvironmentId,
subscribeOrganizationMembersToSurveyResponses,
} from "@/lib/organization/service";
import { getActionClasses } from "@/modules/survey/lib/action-class";
import { selectSurvey } from "@/modules/survey/lib/survey";
createSurvey as createSurveyFromService,
handleTriggerUpdates as handleTriggerUpdatesFromService,
} from "@/lib/survey/service";
import { createSurvey, handleTriggerUpdates } from "./survey";
// Mock dependencies
vi.mock("@/lib/survey/utils", () => ({
checkForInvalidImagesInQuestions: vi.fn(),
vi.mock("@/lib/survey/service", () => ({
createSurvey: vi.fn(),
handleTriggerUpdates: vi.fn(),
}));
vi.mock("@/lib/organization/service", () => ({
subscribeOrganizationMembersToSurveyResponses: vi.fn(),
getOrganizationByEnvironmentId: vi.fn(),
}));
describe("template list survey wrappers", () => {
const environmentId = "env_1";
const surveyBody = { name: "Survey" } as TSurveyCreateInput;
const createdSurvey = { id: "survey_1" } as TSurvey;
vi.mock("@/modules/survey/lib/action-class", () => ({
getActionClasses: vi.fn(),
}));
vi.mock("@/modules/survey/lib/survey", () => ({
selectSurvey: {
id: true,
createdAt: true,
updatedAt: true,
name: true,
type: true,
status: true,
environmentId: true,
segment: true,
},
}));
vi.mock("@formbricks/database", () => ({
prisma: {
survey: {
create: vi.fn(),
update: vi.fn(),
},
segment: {
create: vi.fn(),
},
},
}));
vi.mock("@formbricks/logger", () => ({
logger: {
error: vi.fn(),
},
}));
describe("survey module", () => {
beforeEach(() => {
vi.resetAllMocks();
vi.clearAllMocks();
});
describe("createSurvey", () => {
test("creates a survey successfully", async () => {
// Mock input data
const environmentId = "env-123";
const surveyBody: TSurveyCreateInput = {
name: "Test Survey",
type: "app",
status: "draft",
questions: [],
createdBy: "user-123",
};
// Mock dependencies
const mockActionClasses: ActionClass[] = [];
vi.mocked(getActionClasses).mockResolvedValue(mockActionClasses);
vi.mocked(getOrganizationByEnvironmentId).mockResolvedValue({ id: "org-123", name: "Org" } as any);
const mockCreatedSurvey = {
id: "survey-123",
environmentId,
type: "app",
segment: {
surveys: [{ id: "survey-123" }],
},
} as any;
vi.mocked(prisma.survey.create).mockResolvedValue(mockCreatedSurvey);
const mockSegment = { id: "segment-123" } as any;
vi.mocked(prisma.segment.create).mockResolvedValue(mockSegment);
// Execute function
const result = await createSurvey(environmentId, surveyBody);
// Verify results
expect(getActionClasses).toHaveBeenCalledWith(environmentId);
expect(getOrganizationByEnvironmentId).toHaveBeenCalledWith(environmentId);
expect(prisma.survey.create).toHaveBeenCalledWith({
data: expect.objectContaining({
name: surveyBody.name,
type: surveyBody.type,
environment: { connect: { id: environmentId } },
creator: { connect: { id: surveyBody.createdBy } },
}),
select: selectSurvey,
});
expect(prisma.segment.create).toHaveBeenCalled();
expect(prisma.survey.update).toHaveBeenCalled();
expect(subscribeOrganizationMembersToSurveyResponses).toHaveBeenCalledWith(
"survey-123",
"user-123",
"org-123"
);
expect(result).toBeDefined();
expect(result.id).toBe("survey-123");
});
test("handles empty languages array", async () => {
const environmentId = "env-123";
const surveyBody: TSurveyCreateInput = {
name: "Test Survey",
type: "app",
status: "draft",
languages: [],
questions: [],
};
vi.mocked(getActionClasses).mockResolvedValue([]);
vi.mocked(getOrganizationByEnvironmentId).mockResolvedValue({ id: "org-123" } as any);
vi.mocked(prisma.survey.create).mockResolvedValue({
id: "survey-123",
environmentId,
type: "link",
segment: null,
} as any);
await createSurvey(environmentId, surveyBody);
expect(prisma.survey.create).toHaveBeenCalledWith(
expect.objectContaining({
data: expect.not.objectContaining({ languages: [] }),
})
);
});
test("handles follow-ups properly", async () => {
const environmentId = "env-123";
const surveyBody: TSurveyCreateInput = {
name: "Test Survey",
type: "app",
status: "draft",
questions: [],
followUps: [{ name: "Follow Up 1", trigger: "trigger1", action: "action1" } as any],
};
vi.mocked(getActionClasses).mockResolvedValue([]);
vi.mocked(getOrganizationByEnvironmentId).mockResolvedValue({ id: "org-123" } as any);
vi.mocked(prisma.survey.create).mockResolvedValue({
id: "survey-123",
environmentId,
type: "link",
segment: null,
} as any);
await createSurvey(environmentId, surveyBody);
expect(prisma.survey.create).toHaveBeenCalledWith(
expect.objectContaining({
data: expect.objectContaining({
followUps: {
create: [{ name: "Follow Up 1", trigger: "trigger1", action: "action1" }],
},
}),
})
);
});
test("throws error when organization not found", async () => {
const environmentId = "env-123";
const surveyBody: TSurveyCreateInput = {
name: "Test Survey",
type: "app",
status: "draft",
questions: [],
};
vi.mocked(getActionClasses).mockResolvedValue([]);
vi.mocked(getOrganizationByEnvironmentId).mockResolvedValue(null);
await expect(createSurvey(environmentId, surveyBody)).rejects.toThrow(ResourceNotFoundError);
});
test("handles database errors", async () => {
const environmentId = "env-123";
const surveyBody: TSurveyCreateInput = {
name: "Test Survey",
type: "app",
status: "draft",
questions: [],
};
vi.mocked(getActionClasses).mockResolvedValue([]);
vi.mocked(getOrganizationByEnvironmentId).mockResolvedValue({ id: "org-123" } as any);
const prismaError = new Prisma.PrismaClientKnownRequestError("Database error", {
code: "P2002",
clientVersion: "5.0.0",
});
vi.mocked(prisma.survey.create).mockRejectedValue(prismaError);
await expect(createSurvey(environmentId, surveyBody)).rejects.toThrow(DatabaseError);
expect(logger.error).toHaveBeenCalled();
});
test("re-exports the shared trigger update helper", () => {
expect(handleTriggerUpdates).toBe(handleTriggerUpdatesFromService);
});
describe("handleTriggerUpdates", () => {
test("handles empty triggers", () => {
const result = handleTriggerUpdates(undefined as any, [], []);
expect(result).toEqual({});
});
test("delegates createSurvey to the shared survey service", async () => {
vi.mocked(createSurveyFromService).mockResolvedValueOnce(createdSurvey);
test("adds new triggers", () => {
const updatedTriggers = [
{ actionClass: { id: "action-1" } },
{ actionClass: { id: "action-2" } },
] as any;
const currentTriggers = [] as any;
const actionClasses = [{ id: "action-1" }, { id: "action-2" }] as ActionClass[];
const result = await createSurvey(environmentId, surveyBody);
const result = handleTriggerUpdates(updatedTriggers, currentTriggers, actionClasses);
expect(createSurveyFromService).toHaveBeenCalledWith(environmentId, surveyBody);
expect(result).toBe(createdSurvey);
});
expect(result).toEqual({
create: [{ actionClassId: "action-1" }, { actionClassId: "action-2" }],
});
});
test("propagates service errors", async () => {
const error = new DatabaseError("database error");
vi.mocked(createSurveyFromService).mockRejectedValueOnce(error);
test("removes triggers", () => {
const updatedTriggers = [] as any;
const currentTriggers = [
{ actionClass: { id: "action-1" } },
{ actionClass: { id: "action-2" } },
] as any;
const actionClasses = [{ id: "action-1" }, { id: "action-2" }] as ActionClass[];
const result = handleTriggerUpdates(updatedTriggers, currentTriggers, actionClasses);
expect(result).toEqual({
deleteMany: {
actionClassId: {
in: ["action-1", "action-2"],
},
},
});
});
test("throws error for invalid trigger", () => {
const updatedTriggers = [{ actionClass: { id: "action-3" } }] as any;
const currentTriggers = [] as any;
const actionClasses = [{ id: "action-1" }] as ActionClass[];
expect(() => handleTriggerUpdates(updatedTriggers, currentTriggers, actionClasses)).toThrow(
InvalidInputError
);
});
test("throws error for duplicate triggers", () => {
const updatedTriggers = [
{ actionClass: { id: "action-1" } },
{ actionClass: { id: "action-1" } },
] as any;
const currentTriggers = [] as any;
const actionClasses = [{ id: "action-1" }] as ActionClass[];
expect(() => handleTriggerUpdates(updatedTriggers, currentTriggers, actionClasses)).toThrow(
InvalidInputError
);
});
await expect(createSurvey(environmentId, surveyBody)).rejects.toThrow(error);
});
});

View File

@@ -1,205 +1,11 @@
import { Prisma } from "@prisma/client";
import { prisma } from "@formbricks/database";
import { logger } from "@formbricks/logger";
import { DatabaseError, InvalidInputError, ResourceNotFoundError } from "@formbricks/types/errors";
import { TSurvey, TSurveyCreateInput } from "@formbricks/types/surveys/types";
import {
getOrganizationByEnvironmentId,
subscribeOrganizationMembersToSurveyResponses,
} from "@/lib/organization/service";
import { validateMediaAndPrepareBlocks } from "@/lib/survey/utils";
import { TriggerUpdate } from "@/modules/survey/editor/types/survey-trigger";
import { getActionClasses } from "@/modules/survey/lib/action-class";
import { selectSurvey } from "@/modules/survey/lib/survey";
import { createSurvey as createSurveyFromService, handleTriggerUpdates } from "@/lib/survey/service";
export { handleTriggerUpdates };
export const createSurvey = async (
environmentId: string,
surveyBody: TSurveyCreateInput
): Promise<TSurvey> => {
try {
const { createdBy, ...restSurveyBody } = surveyBody;
// empty languages array
if (!restSurveyBody.languages?.length) {
delete restSurveyBody.languages;
}
const actionClasses = await getActionClasses(environmentId);
// @ts-expect-error
let data: Omit<Prisma.SurveyCreateInput, "environment"> = {
...restSurveyBody,
// TODO: Create with attributeFilters
triggers: restSurveyBody.triggers
? handleTriggerUpdates(restSurveyBody.triggers, [], actionClasses)
: undefined,
attributeFilters: undefined,
};
if (createdBy) {
data.creator = {
connect: {
id: createdBy,
},
};
}
const organization = await getOrganizationByEnvironmentId(environmentId);
if (!organization) {
throw new ResourceNotFoundError("Organization", null);
}
// Survey follow-ups
if (restSurveyBody.followUps?.length) {
data.followUps = {
create: restSurveyBody.followUps.map((followUp) => ({
name: followUp.name,
trigger: followUp.trigger,
action: followUp.action,
})),
};
} else {
delete data.followUps;
}
// Validate and prepare blocks
if (data.blocks && data.blocks.length > 0) {
data.blocks = validateMediaAndPrepareBlocks(data.blocks);
}
const survey = await prisma.survey.create({
data: {
...data,
environment: {
connect: {
id: environmentId,
},
},
},
select: selectSurvey,
});
// if the survey created is an "app" survey, we also create a private segment for it.
if (survey.type === "app") {
const newSegment = await prisma.segment.create({
data: {
title: survey.id,
filters: [],
isPrivate: true,
environment: {
connect: {
id: environmentId,
},
},
},
});
await prisma.survey.update({
where: {
id: survey.id,
},
data: {
segment: {
connect: {
id: newSegment.id,
},
},
},
});
}
// TODO: Fix this, this happens because the survey type "web" is no longer in the zod types but its required in the schema for migration
// @ts-expect-error
const transformedSurvey: TSurvey = {
...survey,
...(survey.segment && {
segment: {
...survey.segment,
surveys: survey.segment.surveys.map((survey) => survey.id),
},
}),
};
if (createdBy) {
await subscribeOrganizationMembersToSurveyResponses(survey.id, createdBy, organization.id);
}
return transformedSurvey;
} catch (error) {
if (error instanceof Prisma.PrismaClientKnownRequestError) {
logger.error(error, "Error creating survey");
throw new DatabaseError(error.message);
}
throw error;
}
};
const getTriggerIds = (triggers: unknown): string[] | null => {
if (!triggers) return null;
if (!Array.isArray(triggers)) {
throw new InvalidInputError("Invalid trigger id");
}
return triggers.map((trigger) => {
const actionClassId = (trigger as { actionClass?: { id?: unknown } })?.actionClass?.id;
if (typeof actionClassId !== "string") {
throw new InvalidInputError("Invalid trigger id");
}
return actionClassId;
});
};
const checkTriggersValidity = (triggers: unknown, actionClasses: Array<{ id: string }>) => {
const triggerIds = getTriggerIds(triggers);
if (!triggerIds) return;
// check if all the triggers are valid
triggerIds.forEach((triggerId) => {
if (!actionClasses.find((actionClass) => actionClass.id === triggerId)) {
throw new InvalidInputError("Invalid trigger id");
}
});
if (new Set(triggerIds).size !== triggerIds.length) {
throw new InvalidInputError("Duplicate trigger id");
}
};
export const handleTriggerUpdates = (
updatedTriggers: unknown,
currentTriggers: unknown,
actionClasses: Array<{ id: string }>
) => {
const updatedTriggerIds = getTriggerIds(updatedTriggers);
if (!updatedTriggerIds) return {};
checkTriggersValidity(updatedTriggers, actionClasses);
const currentTriggerIds = getTriggerIds(currentTriggers) ?? [];
// added triggers are triggers that are not in the current triggers and are there in the new triggers
const addedTriggerIds = updatedTriggerIds.filter((triggerId) => !currentTriggerIds.includes(triggerId));
// deleted triggers are triggers that are not in the new triggers and are there in the current triggers
const deletedTriggerIds = currentTriggerIds.filter((triggerId) => !updatedTriggerIds.includes(triggerId));
// Construct the triggers update object
const triggersUpdate: TriggerUpdate = {};
if (addedTriggerIds.length > 0) {
triggersUpdate.create = addedTriggerIds.map((triggerId) => ({
actionClassId: triggerId,
}));
}
if (deletedTriggerIds.length > 0) {
// disconnect the public triggers from the survey
triggersUpdate.deleteMany = {
actionClassId: {
in: deletedTriggerIds,
},
};
}
return triggersUpdate;
return createSurveyFromService(environmentId, surveyBody);
};

View File

@@ -1,837 +1,59 @@
import { ActionClass, Prisma } from "@prisma/client";
import { afterEach, beforeEach, describe, expect, test, vi } from "vitest";
import { prisma } from "@formbricks/database";
import { DatabaseError, InvalidInputError, ResourceNotFoundError } from "@formbricks/types/errors";
import { TSegment } from "@formbricks/types/segment";
import { TSurvey, TSurveyQuestionTypeEnum } from "@formbricks/types/surveys/types";
import { updateSurveyInternal } from "@/lib/survey/service";
import { getActionClasses } from "@/modules/survey/lib/action-class";
import { getOrganizationAIKeys, getOrganizationIdFromEnvironmentId } from "@/modules/survey/lib/organization";
import { getSurvey } from "@/modules/survey/lib/survey";
import { checkTriggersValidity, handleTriggerUpdates, updateSurvey, updateSurveyDraft } from "./survey";
// Mock dependencies
vi.mock("@formbricks/database", () => ({
prisma: {
survey: {
update: vi.fn(),
},
segment: {
update: vi.fn(),
delete: vi.fn(),
},
},
}));
vi.mock("@/lib/survey/utils", () => ({
checkForInvalidImagesInQuestions: vi.fn(),
}));
import { beforeEach, describe, expect, test, vi } from "vitest";
import { DatabaseError, ResourceNotFoundError } from "@formbricks/types/errors";
import { TSurvey } from "@formbricks/types/surveys/types";
import {
handleTriggerUpdates as handleTriggerUpdatesFromService,
updateSurvey as updateSurveyFromService,
updateSurveyInternal,
} from "@/lib/survey/service";
import { handleTriggerUpdates, updateSurvey, updateSurveyDraft } from "./survey";
vi.mock("@/lib/survey/service", () => ({
handleTriggerUpdates: vi.fn(),
updateSurvey: vi.fn(),
updateSurveyInternal: vi.fn(),
}));
vi.mock("@/modules/survey/lib/action-class", () => ({
getActionClasses: vi.fn(),
}));
describe("survey editor wrappers", () => {
const survey = { id: "survey_1" } as TSurvey;
vi.mock("@/modules/survey/lib/organization", () => ({
getOrganizationIdFromEnvironmentId: vi.fn(),
getOrganizationAIKeys: vi.fn(),
}));
vi.mock("@/modules/survey/lib/survey", () => ({
getSurvey: vi.fn(),
selectSurvey: {
id: true,
createdAt: true,
updatedAt: true,
name: true,
type: true,
environmentId: true,
},
}));
vi.mock("@formbricks/logger", () => ({
logger: {
error: vi.fn(),
},
}));
describe("Survey Editor Library Tests", () => {
afterEach(() => {
beforeEach(() => {
vi.clearAllMocks();
});
describe("updateSurvey", () => {
const mockSurvey = {
id: "survey123",
createdAt: new Date(),
updatedAt: new Date(),
name: "Test Survey",
type: "app",
environmentId: "env123",
createdBy: "user123",
status: "draft",
displayOption: "displayOnce",
questions: [
{
id: "q1",
type: TSurveyQuestionTypeEnum.OpenText,
headline: { default: "Question 1" },
required: false,
inputType: "text",
charLimit: { enabled: false },
},
],
welcomeCard: {
enabled: false,
timeToFinish: true,
showResponseCount: false,
},
triggers: [],
endings: [],
hiddenFields: { enabled: false },
delay: 0,
autoComplete: null,
projectOverwrites: null,
styling: null,
showLanguageSwitch: false,
segment: null,
surveyClosedMessage: null,
singleUse: null,
isVerifyEmailEnabled: false,
recaptcha: null,
isSingleResponsePerEmailEnabled: false,
isBackButtonHidden: false,
pin: null,
displayPercentage: null,
languages: [
{
language: {
id: "en",
code: "en",
createdAt: new Date(),
updatedAt: new Date(),
alias: null,
projectId: "project1",
},
default: true,
enabled: true,
},
],
variables: [],
followUps: [],
} as unknown as TSurvey;
const mockCurrentSurvey = { ...mockSurvey };
const mockActionClasses: ActionClass[] = [
{
id: "action1",
name: "Code Action",
description: "Action from code",
type: "code" as const,
environmentId: "env123",
createdAt: new Date(),
updatedAt: new Date(),
key: null,
noCodeConfig: null,
},
];
const mockOrganizationId = "org123";
const mockOrganization = {
id: mockOrganizationId,
name: "Test Organization",
ownerUserId: "user123",
billing: {
stripeCustomerId: "cust_123",
features: {},
usageCycleAnchor: new Date(),
},
isAIEnabled: false,
};
beforeEach(() => {
vi.mocked(prisma.survey.update).mockResolvedValue(mockSurvey as any);
vi.mocked(prisma.segment.update).mockResolvedValue({
id: "segment1",
environmentId: "env123",
surveys: [{ id: "survey123" }],
} as any);
vi.mocked(getSurvey).mockResolvedValue(mockCurrentSurvey);
vi.mocked(getActionClasses).mockResolvedValue(mockActionClasses);
vi.mocked(getOrganizationIdFromEnvironmentId).mockResolvedValue(mockOrganizationId);
vi.mocked(getOrganizationAIKeys).mockResolvedValue(mockOrganization as any);
});
test("should handle languages update with multiple languages", async () => {
const updatedSurvey: TSurvey = {
...mockSurvey,
languages: [
{
language: {
id: "en",
code: "en",
createdAt: new Date(),
updatedAt: new Date(),
alias: null,
projectId: "project1",
},
default: true,
enabled: true,
},
{
language: {
id: "es",
code: "es",
createdAt: new Date(),
updatedAt: new Date(),
alias: null,
projectId: "project1",
},
default: false,
enabled: true,
},
],
};
await updateSurvey(updatedSurvey);
expect(prisma.survey.update).toHaveBeenCalledWith({
where: { id: "survey123" },
data: expect.objectContaining({
languages: {
updateMany: expect.any(Array),
create: expect.arrayContaining([
expect.objectContaining({
languageId: "es",
default: false,
enabled: true,
}),
]),
},
}),
select: expect.any(Object),
});
});
test("should handle languages update with single default language", async () => {
// This tests the fix for the bug where languages.length === 1 would incorrectly
// set updatedLanguageIds to [] causing the default language to be removed
const updatedSurvey: TSurvey = {
...mockSurvey,
languages: [
{
language: {
id: "en",
code: "en",
createdAt: new Date(),
updatedAt: new Date(),
alias: null,
projectId: "project1",
},
default: true,
enabled: true,
},
],
};
await updateSurvey(updatedSurvey);
// Verify that prisma.survey.update was called
expect(prisma.survey.update).toHaveBeenCalled();
const updateCall = vi.mocked(prisma.survey.update).mock.calls[0][0];
// The key test: when languages.length === 1, we should still process language updates
// and NOT delete the language. Before the fix, languages.length > 1 would fail this case.
expect(updateCall).toBeDefined();
expect(updateCall.where).toEqual({ id: "survey123" });
expect(updateCall.data).toBeDefined();
});
test("should remove all languages when empty array is passed", async () => {
const updatedSurvey: TSurvey = {
...mockSurvey,
languages: [],
};
await updateSurvey(updatedSurvey);
// Verify that prisma.survey.update was called
expect(prisma.survey.update).toHaveBeenCalled();
const updateCall = vi.mocked(prisma.survey.update).mock.calls[0][0];
// When languages is empty array, all existing languages should be removed
expect(updateCall).toBeDefined();
expect(updateCall.where).toEqual({ id: "survey123" });
expect(updateCall.data).toBeDefined();
});
test("should delete private segment for non-app type surveys", async () => {
const mockSegment: TSegment = {
id: "segment1",
title: "Test Segment",
isPrivate: true,
environmentId: "env123",
surveys: ["survey123"],
createdAt: new Date(),
updatedAt: new Date(),
description: null,
filters: [{ id: "filter1" } as any],
};
const updatedSurvey: TSurvey = {
...mockSurvey,
type: "link",
segment: mockSegment,
};
await updateSurvey(updatedSurvey);
expect(prisma.segment.update).toHaveBeenCalledWith({
where: { id: "segment1" },
data: {
surveys: {
disconnect: {
id: "survey123",
},
},
},
});
expect(prisma.segment.delete).toHaveBeenCalledWith({
where: {
id: "segment1",
},
});
});
test("should disconnect public segment for non-app type surveys", async () => {
const mockSegment: TSegment = {
id: "segment1",
title: "Test Segment",
isPrivate: false,
environmentId: "env123",
surveys: ["survey123"],
createdAt: new Date(),
updatedAt: new Date(),
description: null,
filters: [],
};
const updatedSurvey: TSurvey = {
...mockSurvey,
type: "link",
segment: mockSegment,
};
await updateSurvey(updatedSurvey);
expect(prisma.survey.update).toHaveBeenCalledWith({
where: {
id: "survey123",
},
data: {
segment: {
disconnect: true,
},
},
});
});
test("should handle followUps updates", async () => {
const updatedSurvey: TSurvey = {
...mockSurvey,
followUps: [
{
id: "f1",
name: "Existing Follow Up",
createdAt: new Date(),
updatedAt: new Date(),
surveyId: "survey123",
trigger: {
type: "response",
properties: {
endingIds: ["ending1"],
},
},
action: {
type: "send-email",
properties: {
to: "test@example.com",
subject: "Test",
body: "Test body",
from: "test@formbricks.com",
replyTo: ["reply@formbricks.com"],
attachResponseData: false,
},
},
deleted: false,
},
{
id: "f2",
name: "New Follow Up",
createdAt: new Date(),
updatedAt: new Date(),
surveyId: "survey123",
trigger: {
type: "response",
properties: {
endingIds: ["ending1"],
},
},
action: {
type: "send-email",
properties: {
to: "new@example.com",
subject: "New Test",
body: "New test body",
from: "test@formbricks.com",
replyTo: ["reply@formbricks.com"],
attachResponseData: false,
},
},
deleted: false,
},
{
id: "f3",
name: "Follow Up To Delete",
createdAt: new Date(),
updatedAt: new Date(),
surveyId: "survey123",
trigger: {
type: "response",
properties: {
endingIds: ["ending1"],
},
},
action: {
type: "send-email",
properties: {
to: "delete@example.com",
subject: "Delete Test",
body: "Delete test body",
from: "test@formbricks.com",
replyTo: ["reply@formbricks.com"],
attachResponseData: false,
},
},
deleted: true,
},
],
};
// Mock current survey with existing followUps
vi.mocked(getSurvey).mockResolvedValueOnce({
...mockCurrentSurvey,
followUps: [
{
id: "f1",
name: "Existing Follow Up",
trigger: {
type: "response",
properties: {
endingIds: ["ending1"],
},
},
action: {
type: "send-email",
properties: {
to: "test@example.com",
subject: "Test",
body: "Test body",
from: "test@formbricks.com",
replyTo: ["reply@formbricks.com"],
attachResponseData: false,
},
},
},
],
} as any);
await updateSurvey(updatedSurvey);
expect(prisma.survey.update).toHaveBeenCalledWith({
where: { id: "survey123" },
data: expect.objectContaining({
followUps: {
updateMany: [
{
where: {
id: "f1",
},
data: expect.objectContaining({
name: "Existing Follow Up",
}),
},
],
createMany: {
data: [
expect.objectContaining({
name: "New Follow Up",
}),
],
},
deleteMany: [
{
id: "f3",
},
],
},
}),
select: expect.any(Object),
});
});
test("should throw ResourceNotFoundError when survey is not found", async () => {
vi.mocked(getSurvey).mockResolvedValueOnce(null as unknown as TSurvey);
await expect(updateSurvey(mockSurvey)).rejects.toThrow(ResourceNotFoundError);
expect(getSurvey).toHaveBeenCalledWith("survey123");
});
test("should throw ResourceNotFoundError when organization is not found", async () => {
vi.mocked(getOrganizationAIKeys).mockResolvedValueOnce(null);
await expect(updateSurvey(mockSurvey)).rejects.toThrow(ResourceNotFoundError);
});
test("should throw DatabaseError when Prisma throws a known request error", async () => {
const prismaError = new Prisma.PrismaClientKnownRequestError("Database error", {
code: "P2002",
clientVersion: "4.0.0",
});
vi.mocked(prisma.survey.update).mockRejectedValueOnce(prismaError);
await expect(updateSurvey(mockSurvey)).rejects.toThrow(DatabaseError);
});
test("should rethrow other errors", async () => {
const genericError = new Error("Some other error");
vi.mocked(prisma.survey.update).mockRejectedValueOnce(genericError);
await expect(updateSurvey(mockSurvey)).rejects.toThrow(genericError);
});
test("should throw InvalidInputError for invalid segment filters", async () => {
const updatedSurvey: TSurvey = {
...mockSurvey,
segment: {
id: "segment1",
title: "Test Segment",
isPrivate: false,
environmentId: "env123",
surveys: ["survey123"],
createdAt: new Date(),
updatedAt: new Date(),
description: null,
filters: "invalid filters" as any,
},
};
await expect(updateSurvey(updatedSurvey)).rejects.toThrow(InvalidInputError);
});
test("should handle error in segment update", async () => {
vi.mocked(prisma.segment.update).mockRejectedValueOnce(new Error("Error updating survey"));
const updatedSurvey: TSurvey = {
...mockSurvey,
segment: {
id: "segment1",
title: "Test Segment",
isPrivate: false,
environmentId: "env123",
surveys: ["survey123"],
createdAt: new Date(),
updatedAt: new Date(),
description: null,
filters: [],
},
};
await expect(updateSurvey(updatedSurvey)).rejects.toThrow("Error updating survey");
});
test("re-exports the shared trigger update helper", () => {
expect(handleTriggerUpdates).toBe(handleTriggerUpdatesFromService);
});
describe("checkTriggersValidity", () => {
const mockActionClasses: ActionClass[] = [
{
id: "action1",
name: "Action 1",
description: "Test Action 1",
type: "code" as const,
environmentId: "env123",
createdAt: new Date(),
updatedAt: new Date(),
key: null,
noCodeConfig: null,
},
{
id: "action2",
name: "Action 2",
description: "Test Action 2",
type: "noCode" as const,
environmentId: "env123",
createdAt: new Date(),
updatedAt: new Date(),
key: null,
noCodeConfig: null,
},
];
test("delegates updateSurvey to the shared survey service", async () => {
vi.mocked(updateSurveyFromService).mockResolvedValueOnce(survey);
const createFullActionClass = (id: string, type: "code" | "noCode" = "code"): ActionClass => ({
id,
name: `Action ${id}`,
description: `Test Action ${id}`,
type,
environmentId: "env123",
createdAt: new Date(),
updatedAt: new Date(),
key: null,
noCodeConfig: null,
});
const result = await updateSurvey(survey);
test("should not throw error for valid triggers", () => {
const triggers = [
{ actionClass: createFullActionClass("action1") },
{ actionClass: createFullActionClass("action2", "noCode") },
];
expect(() => checkTriggersValidity(triggers as any, mockActionClasses)).not.toThrow();
});
test("should throw error for invalid trigger id", () => {
const triggers = [
{ actionClass: createFullActionClass("action1") },
{ actionClass: createFullActionClass("invalid") },
];
expect(() => checkTriggersValidity(triggers as any, mockActionClasses)).toThrow(InvalidInputError);
expect(() => checkTriggersValidity(triggers as any, mockActionClasses)).toThrow("Invalid trigger id");
});
test("should throw error for duplicate trigger ids", () => {
const triggers = [
{ actionClass: createFullActionClass("action1") },
{ actionClass: createFullActionClass("action1") },
];
expect(() => checkTriggersValidity(triggers as any, mockActionClasses)).toThrow(InvalidInputError);
expect(() => checkTriggersValidity(triggers as any, mockActionClasses)).toThrow("Duplicate trigger id");
});
test("should do nothing when triggers are undefined", () => {
expect(() => checkTriggersValidity(undefined as any, mockActionClasses)).not.toThrow();
});
expect(updateSurveyFromService).toHaveBeenCalledWith(survey);
expect(result).toBe(survey);
});
describe("handleTriggerUpdates", () => {
const mockActionClasses: ActionClass[] = [
{
id: "action1",
name: "Action 1",
description: "Test Action 1",
type: "code" as const,
environmentId: "env123",
createdAt: new Date(),
updatedAt: new Date(),
key: null,
noCodeConfig: null,
},
{
id: "action2",
name: "Action 2",
description: "Test Action 2",
type: "noCode" as const,
environmentId: "env123",
createdAt: new Date(),
updatedAt: new Date(),
key: null,
noCodeConfig: null,
},
{
id: "action3",
name: "Action 3",
description: "Test Action 3",
type: "noCode" as const,
environmentId: "env123",
createdAt: new Date(),
updatedAt: new Date(),
key: null,
noCodeConfig: null,
},
];
test("delegates draft saves to updateSurveyInternal with skipValidation enabled", async () => {
vi.mocked(updateSurveyInternal).mockResolvedValueOnce(survey);
const createActionClassObj = (id: string, type: "code" | "noCode" = "code"): ActionClass => ({
id,
name: `Action ${id}`,
description: `Test Action ${id}`,
type,
environmentId: "env123",
createdAt: new Date(),
updatedAt: new Date(),
key: null,
noCodeConfig: null,
});
const result = await updateSurveyDraft(survey);
test("should return empty object when updatedTriggers is undefined", () => {
const result = handleTriggerUpdates(undefined as any, [], mockActionClasses);
expect(result).toEqual({});
});
test("should identify added triggers correctly", () => {
const currentTriggers = [{ actionClass: createActionClassObj("action1") }];
const updatedTriggers = [
{ actionClass: createActionClassObj("action1") },
{ actionClass: createActionClassObj("action2", "noCode") },
];
const result = handleTriggerUpdates(updatedTriggers as any, currentTriggers as any, mockActionClasses);
expect(result).toEqual({
create: [{ actionClassId: "action2" }],
});
});
test("should identify deleted triggers correctly", () => {
const currentTriggers = [
{ actionClass: createActionClassObj("action1") },
{ actionClass: createActionClassObj("action2", "noCode") },
];
const updatedTriggers = [{ actionClass: createActionClassObj("action1") }];
const result = handleTriggerUpdates(updatedTriggers as any, currentTriggers as any, mockActionClasses);
expect(result).toEqual({
deleteMany: {
actionClassId: {
in: ["action2"],
},
},
});
});
test("should handle both added and deleted triggers", () => {
const currentTriggers = [
{ actionClass: createActionClassObj("action1") },
{ actionClass: createActionClassObj("action2", "noCode") },
];
const updatedTriggers = [
{ actionClass: createActionClassObj("action1") },
{ actionClass: createActionClassObj("action3", "noCode") },
];
const result = handleTriggerUpdates(updatedTriggers as any, currentTriggers as any, mockActionClasses);
expect(result).toEqual({
create: [{ actionClassId: "action3" }],
deleteMany: {
actionClassId: {
in: ["action2"],
},
},
});
});
test("should validate triggers before processing", () => {
const currentTriggers = [{ actionClass: createActionClassObj("action1") }];
const updatedTriggers = [
{ actionClass: createActionClassObj("action1") },
{ actionClass: createActionClassObj("invalid") },
];
expect(() =>
handleTriggerUpdates(updatedTriggers as any, currentTriggers as any, mockActionClasses)
).toThrow(InvalidInputError);
});
expect(updateSurveyInternal).toHaveBeenCalledWith(survey, true);
expect(result).toBe(survey);
});
describe("updateSurveyDraft", () => {
const mockSurvey = {
id: "survey123",
createdAt: new Date(),
updatedAt: new Date(),
name: "Draft Survey",
type: "app",
environmentId: "env123",
createdBy: "user123",
status: "draft",
displayOption: "displayOnce",
questions: [
{
id: "q1",
type: TSurveyQuestionTypeEnum.OpenText,
headline: { default: "Question 1" },
required: false,
inputType: "text",
charLimit: { enabled: false },
},
],
welcomeCard: {
enabled: false,
timeToFinish: true,
showResponseCount: false,
},
triggers: [],
endings: [],
hiddenFields: { enabled: false },
delay: 0,
autoComplete: null,
projectOverwrites: null,
styling: null,
showLanguageSwitch: false,
segment: null,
surveyClosedMessage: null,
singleUse: null,
isVerifyEmailEnabled: false,
recaptcha: null,
isSingleResponsePerEmailEnabled: false,
isBackButtonHidden: false,
pin: null,
displayPercentage: null,
languages: [],
variables: [],
followUps: [],
} as unknown as TSurvey;
test("propagates service errors for updateSurvey", async () => {
const error = new DatabaseError("database error");
vi.mocked(updateSurveyFromService).mockRejectedValueOnce(error);
beforeEach(() => {
vi.mocked(updateSurveyInternal).mockResolvedValue(mockSurvey);
});
await expect(updateSurvey(survey)).rejects.toThrow(error);
});
test("should call updateSurveyInternal with skipValidation=true", async () => {
await updateSurveyDraft(mockSurvey);
test("propagates service errors for updateSurveyDraft", async () => {
const error = new ResourceNotFoundError("Survey", "survey_1");
vi.mocked(updateSurveyInternal).mockRejectedValueOnce(error);
expect(updateSurveyInternal).toHaveBeenCalledWith(mockSurvey, true);
expect(updateSurveyInternal).toHaveBeenCalledTimes(1);
});
test("should return the survey from updateSurveyInternal", async () => {
const result = await updateSurveyDraft(mockSurvey);
expect(result).toEqual(mockSurvey);
});
test("should propagate errors from updateSurveyInternal", async () => {
const error = new Error("Internal update failed");
vi.mocked(updateSurveyInternal).mockRejectedValueOnce(error);
await expect(updateSurveyDraft(mockSurvey)).rejects.toThrow("Internal update failed");
});
test("should propagate ResourceNotFoundError from updateSurveyInternal", async () => {
vi.mocked(updateSurveyInternal).mockRejectedValueOnce(new ResourceNotFoundError("Survey", "survey123"));
await expect(updateSurveyDraft(mockSurvey)).rejects.toThrow(ResourceNotFoundError);
});
test("should propagate DatabaseError from updateSurveyInternal", async () => {
vi.mocked(updateSurveyInternal).mockRejectedValueOnce(new DatabaseError("Database connection failed"));
await expect(updateSurveyDraft(mockSurvey)).rejects.toThrow(DatabaseError);
});
await expect(updateSurveyDraft(survey)).rejects.toThrow(error);
});
});

View File

@@ -1,357 +1,16 @@
import { Prisma } from "@prisma/client";
import { prisma } from "@formbricks/database";
import { logger } from "@formbricks/logger";
import { DatabaseError, InvalidInputError, ResourceNotFoundError } from "@formbricks/types/errors";
import { TSegment, ZSegmentFilters } from "@formbricks/types/segment";
import { TSurvey } from "@formbricks/types/surveys/types";
import { updateSurveyInternal } from "@/lib/survey/service";
import { validateMediaAndPrepareBlocks } from "@/lib/survey/utils";
import { TriggerUpdate } from "@/modules/survey/editor/types/survey-trigger";
import { getActionClasses } from "@/modules/survey/lib/action-class";
import { getOrganizationAIKeys, getOrganizationIdFromEnvironmentId } from "@/modules/survey/lib/organization";
import { getSurvey, selectSurvey } from "@/modules/survey/lib/survey";
import {
handleTriggerUpdates,
updateSurvey as updateSurveyFromService,
updateSurveyInternal,
} from "@/lib/survey/service";
export { handleTriggerUpdates };
export const updateSurveyDraft = async (updatedSurvey: TSurvey): Promise<TSurvey> => {
// Use internal version with skipValidation=true to allow incomplete drafts
return updateSurveyInternal(updatedSurvey, true);
};
export const updateSurvey = async (updatedSurvey: TSurvey): Promise<TSurvey> => {
try {
const surveyId = updatedSurvey.id;
let data: any = {};
const actionClasses = await getActionClasses(updatedSurvey.environmentId);
const currentSurvey = await getSurvey(surveyId);
if (!currentSurvey) {
throw new ResourceNotFoundError("Survey", surveyId);
}
const { triggers, environmentId, segment, questions, languages, type, followUps, ...surveyData } =
updatedSurvey;
// Validate and prepare blocks for persistence
if (updatedSurvey.blocks && updatedSurvey.blocks.length > 0) {
data.blocks = validateMediaAndPrepareBlocks(updatedSurvey.blocks);
}
if (languages) {
// Process languages update logic here
// Extract currentLanguageIds and updatedLanguageIds
const currentLanguageIds = currentSurvey.languages
? currentSurvey.languages.map((l) => l.language.id)
: [];
const updatedLanguageIds =
languages.length > 0 ? updatedSurvey.languages.map((l) => l.language.id) : [];
const enabledLanguageIds = languages.map((language) => {
if (language.enabled) return language.language.id;
});
// Determine languages to add and remove
const languagesToAdd = updatedLanguageIds.filter((id) => !currentLanguageIds.includes(id));
const languagesToRemove = currentLanguageIds.filter((id) => !updatedLanguageIds.includes(id));
const defaultLanguageId = updatedSurvey.languages.find((l) => l.default)?.language.id;
// Prepare data for Prisma update
data.languages = {};
// Update existing languages for default value changes
data.languages.updateMany = currentSurvey.languages.map((surveyLanguage) => ({
where: { languageId: surveyLanguage.language.id },
data: {
default: surveyLanguage.language.id === defaultLanguageId,
enabled: enabledLanguageIds.includes(surveyLanguage.language.id),
},
}));
// Add new languages
if (languagesToAdd.length > 0) {
data.languages.create = languagesToAdd.map((languageId) => ({
languageId: languageId,
default: languageId === defaultLanguageId,
enabled: enabledLanguageIds.includes(languageId),
}));
}
// Remove languages no longer associated with the survey
if (languagesToRemove.length > 0) {
data.languages.deleteMany = languagesToRemove.map((languageId) => ({
languageId: languageId,
enabled: enabledLanguageIds.includes(languageId),
}));
}
}
if (triggers) {
data.triggers = handleTriggerUpdates(triggers, currentSurvey.triggers, actionClasses);
}
// if the survey body has type other than "app" but has a private segment, we delete that segment, and if it has a public segment, we disconnect from to the survey
if (segment) {
if (type === "app") {
// parse the segment filters:
const parsedFilters = ZSegmentFilters.safeParse(segment.filters);
if (!parsedFilters.success) {
throw new InvalidInputError("Invalid user segment filters");
}
try {
// update the segment:
let updatedInput: Prisma.SegmentUpdateInput = {
...segment,
surveys: undefined,
};
if (segment.surveys) {
updatedInput = {
...segment,
surveys: {
connect: segment.surveys.map((surveyId) => ({ id: surveyId })),
},
};
}
await prisma.segment.update({
where: { id: segment.id },
data: updatedInput,
select: {
surveys: { select: { id: true } },
environmentId: true,
id: true,
},
});
} catch (error) {
logger.error(error, "Error updating survey");
throw new Error("Error updating survey");
}
} else {
if (segment.isPrivate) {
// disconnect the private segment first and then delete:
await prisma.segment.update({
where: { id: segment.id },
data: {
surveys: {
disconnect: {
id: surveyId,
},
},
},
});
// delete the private segment:
await prisma.segment.delete({
where: {
id: segment.id,
},
});
} else {
await prisma.survey.update({
where: {
id: surveyId,
},
data: {
segment: {
disconnect: true,
},
},
});
}
}
} else if (type === "app") {
if (!currentSurvey.segment) {
await prisma.survey.update({
where: {
id: surveyId,
},
data: {
segment: {
connectOrCreate: {
where: {
environmentId_title: {
environmentId,
title: surveyId,
},
},
create: {
title: surveyId,
isPrivate: true,
filters: [],
environment: {
connect: {
id: environmentId,
},
},
},
},
},
},
});
}
}
if (followUps) {
// Separate follow-ups into categories based on deletion flag
const deletedFollowUps = followUps.filter((followUp) => followUp.deleted);
const nonDeletedFollowUps = followUps.filter((followUp) => !followUp.deleted);
// Get set of existing follow-up IDs from currentSurvey
const existingFollowUpIds = new Set(currentSurvey.followUps.map((f) => f.id));
// Separate non-deleted follow-ups into new and existing
const existingFollowUps = nonDeletedFollowUps.filter((followUp) =>
existingFollowUpIds.has(followUp.id)
);
const newFollowUps = nonDeletedFollowUps.filter((followUp) => !existingFollowUpIds.has(followUp.id));
data.followUps = {
// Update existing follow-ups
updateMany: existingFollowUps.map((followUp) => ({
where: {
id: followUp.id,
},
data: {
name: followUp.name,
trigger: followUp.trigger,
action: followUp.action,
},
})),
// Create new follow-ups
createMany:
newFollowUps.length > 0
? {
data: newFollowUps.map((followUp) => ({
id: followUp.id,
name: followUp.name,
trigger: followUp.trigger,
action: followUp.action,
})),
}
: undefined,
// Delete follow-ups marked as deleted, regardless of whether they exist in DB
deleteMany:
deletedFollowUps.length > 0
? deletedFollowUps.map((followUp) => ({
id: followUp.id,
}))
: undefined,
};
}
const organizationId = await getOrganizationIdFromEnvironmentId(environmentId);
const organization = await getOrganizationAIKeys(organizationId);
if (!organization) {
throw new ResourceNotFoundError("Organization", null);
}
surveyData.updatedAt = new Date();
data = {
...surveyData,
...data,
type,
};
delete data.createdBy;
const prismaSurvey = await prisma.survey.update({
where: { id: surveyId },
data,
select: selectSurvey,
});
let surveySegment: TSegment | null = null;
if (prismaSurvey.segment) {
surveySegment = {
...prismaSurvey.segment,
surveys: prismaSurvey.segment.surveys.map((survey) => survey.id),
};
}
const modifiedSurvey: TSurvey = {
...prismaSurvey, // Properties from prismaSurvey
displayPercentage: Number(prismaSurvey.displayPercentage) || null,
segment: surveySegment,
customHeadScriptsMode: prismaSurvey.customHeadScriptsMode,
};
return modifiedSurvey;
} catch (error) {
if (error instanceof Prisma.PrismaClientKnownRequestError) {
logger.error(error, "Error updating survey");
throw new DatabaseError(error.message);
}
throw error;
}
};
const getTriggerIds = (triggers: unknown): string[] | null => {
if (!triggers) return null;
if (!Array.isArray(triggers)) {
throw new InvalidInputError("Invalid trigger id");
}
return triggers.map((trigger) => {
const actionClassId = (trigger as { actionClass?: { id?: unknown } })?.actionClass?.id;
if (typeof actionClassId !== "string") {
throw new InvalidInputError("Invalid trigger id");
}
return actionClassId;
});
};
export const checkTriggersValidity = (triggers: unknown, actionClasses: Array<{ id: string }>) => {
const triggerIds = getTriggerIds(triggers);
if (!triggerIds) return;
// check if all the triggers are valid
triggerIds.forEach((triggerId) => {
if (!actionClasses.find((actionClass) => actionClass.id === triggerId)) {
throw new InvalidInputError("Invalid trigger id");
}
});
if (new Set(triggerIds).size !== triggerIds.length) {
throw new InvalidInputError("Duplicate trigger id");
}
};
export const handleTriggerUpdates = (
updatedTriggers: unknown,
currentTriggers: unknown,
actionClasses: Array<{ id: string }>
) => {
const updatedTriggerIds = getTriggerIds(updatedTriggers);
if (!updatedTriggerIds) return {};
checkTriggersValidity(updatedTriggers, actionClasses);
const currentTriggerIds = getTriggerIds(currentTriggers) ?? [];
// added triggers are triggers that are not in the current triggers and are there in the new triggers
const addedTriggerIds = updatedTriggerIds.filter((triggerId) => !currentTriggerIds.includes(triggerId));
// deleted triggers are triggers that are not in the new triggers and are there in the current triggers
const deletedTriggerIds = currentTriggerIds.filter((triggerId) => !updatedTriggerIds.includes(triggerId));
// Construct the triggers update object
const triggersUpdate: TriggerUpdate = {};
if (addedTriggerIds.length > 0) {
triggersUpdate.create = addedTriggerIds.map((triggerId) => ({
actionClassId: triggerId,
}));
}
if (deletedTriggerIds.length > 0) {
// disconnect the public triggers from the survey
triggersUpdate.deleteMany = {
actionClassId: {
in: deletedTriggerIds,
},
};
}
return triggersUpdate;
return updateSurveyFromService(updatedSurvey);
};

View File

@@ -16,6 +16,8 @@ export const selectSurvey = {
environmentId: true,
createdBy: true,
status: true,
startsAt: true,
endsAt: true,
welcomeCard: true,
questions: true,
blocks: true,

View File

@@ -7,6 +7,7 @@ import { logger } from "@formbricks/logger";
import { TActionClassType } from "@formbricks/types/action-classes";
import { DatabaseError, ResourceNotFoundError } from "@formbricks/types/errors";
import { getOrganizationByEnvironmentId } from "@/lib/organization/service";
import { deleteSurveyLifecycleJobs } from "@/lib/river/survey-lifecycle";
import { checkForInvalidMediaInBlocks } from "@/lib/survey/utils";
import { validateInputs } from "@/lib/utils/validate";
import { getIsQuotasEnabled } from "@/modules/ee/license-check/lib/utils";
@@ -25,6 +26,8 @@ import {
} from "./survey";
import { surveySelect } from "./survey-record";
vi.mock("server-only", () => ({}));
vi.mock("react", async (importOriginal) => {
const actual = await importOriginal<typeof import("react")>();
return {
@@ -37,6 +40,10 @@ vi.mock("@/lib/survey/utils", () => ({
checkForInvalidMediaInBlocks: vi.fn(() => ({ ok: true, data: undefined })),
}));
vi.mock("@/lib/river/survey-lifecycle", () => ({
deleteSurveyLifecycleJobs: vi.fn(),
}));
vi.mock("@/lib/utils/validate", () => ({
validateInputs: vi.fn(),
}));
@@ -76,6 +83,7 @@ vi.mock("@/lingodotdev/server", () => ({
vi.mock("@formbricks/database", () => ({
prisma: {
$transaction: vi.fn(),
survey: {
findMany: vi.fn(),
findUnique: vi.fn(),
@@ -126,9 +134,11 @@ const resetMocks = () => {
vi.mocked(prisma.survey.count).mockReset();
vi.mocked(prisma.survey.delete).mockReset();
vi.mocked(prisma.survey.create).mockReset();
vi.mocked(prisma.$transaction).mockReset();
vi.mocked(prisma.segment.delete).mockReset();
vi.mocked(prisma.segment.findFirst).mockReset();
vi.mocked(prisma.actionClass.findMany).mockReset();
vi.mocked(deleteSurveyLifecycleJobs).mockReset();
vi.mocked(logger.error).mockClear();
};
@@ -423,6 +433,7 @@ describe("getSurveysSortedByRelevance", () => {
describe("deleteSurvey", () => {
beforeEach(() => {
resetMocks();
vi.mocked(prisma.$transaction).mockImplementation(async (callback: any) => callback(prisma));
});
const mockDeletedSurveyData = {
@@ -438,6 +449,7 @@ describe("deleteSurvey", () => {
const result = await deleteSurvey(surveyId);
expect(result).toBe(true);
expect(deleteSurveyLifecycleJobs).toHaveBeenCalledWith({ tx: prisma, surveyId });
expect(prisma.survey.delete).toHaveBeenCalledWith({
where: { id: surveyId },
select: expect.objectContaining({ id: true, environmentId: true, segment: expect.anything() }),
@@ -454,19 +466,20 @@ describe("deleteSurvey", () => {
await deleteSurvey(surveyId);
expect(deleteSurveyLifecycleJobs).toHaveBeenCalledWith({ tx: prisma, surveyId });
expect(prisma.segment.delete).not.toHaveBeenCalled();
});
test("should throw DatabaseError on Prisma error", async () => {
const prismaError = makePrismaKnownError();
vi.mocked(prisma.survey.delete).mockRejectedValue(prismaError);
vi.mocked(prisma.$transaction).mockRejectedValue(prismaError);
await expect(deleteSurvey(surveyId)).rejects.toThrow(DatabaseError);
expect(logger.error).toHaveBeenCalledWith(prismaError, "Error deleting survey");
});
test("should rethrow unknown error", async () => {
const unknownError = new Error("Unknown error");
vi.mocked(prisma.survey.delete).mockRejectedValue(unknownError);
vi.mocked(prisma.$transaction).mockRejectedValue(unknownError);
await expect(deleteSurvey(surveyId)).rejects.toThrow(unknownError);
});
});

View File

@@ -8,6 +8,7 @@ import { logger } from "@formbricks/logger";
import { DatabaseError, InvalidInputError, ResourceNotFoundError } from "@formbricks/types/errors";
import { TSurveyFilterCriteria } from "@formbricks/types/surveys/types";
import { getOrganizationByEnvironmentId } from "@/lib/organization/service";
import { deleteSurveyLifecycleJobs } from "@/lib/river/survey-lifecycle";
import { checkForInvalidMediaInBlocks } from "@/lib/survey/utils";
import { validateInputs } from "@/lib/utils/validate";
import { getTranslate } from "@/lingodotdev/server";
@@ -147,39 +148,45 @@ export const getSurvey = reactCache(async (surveyId: string): Promise<TSurvey |
export const deleteSurvey = async (surveyId: string): Promise<boolean> => {
try {
const deletedSurvey = await prisma.survey.delete({
where: {
id: surveyId,
},
select: {
id: true,
environmentId: true,
segment: {
select: {
id: true,
isPrivate: true,
},
const deletedSurvey = await prisma.$transaction(async (tx) => {
await deleteSurveyLifecycleJobs({ tx, surveyId });
const removedSurvey = await tx.survey.delete({
where: {
id: surveyId,
},
type: true,
triggers: {
select: {
actionClass: {
select: {
id: true,
select: {
id: true,
environmentId: true,
segment: {
select: {
id: true,
isPrivate: true,
},
},
type: true,
triggers: {
select: {
actionClass: {
select: {
id: true,
},
},
},
},
},
},
});
if (deletedSurvey.type === "app" && deletedSurvey.segment?.isPrivate) {
await prisma.segment.delete({
where: {
id: deletedSurvey.segment.id,
},
});
}
if (removedSurvey.type === "app" && removedSurvey.segment?.isPrivate) {
await tx.segment.delete({
where: {
id: removedSurvey.segment.id,
},
});
}
return removedSurvey;
});
return true;
} catch (error) {

View File

@@ -4,7 +4,8 @@
"private": true,
"workspaces": [
"apps/*",
"packages/*"
"packages/*",
"services/*"
],
"prisma": {
"schema": "packages/database/schema.prisma"

View File

@@ -0,0 +1,213 @@
ALTER TABLE "Survey"
ADD COLUMN "startsAt" TIMESTAMP(3),
ADD COLUMN "endsAt" TIMESTAMP(3);
ALTER TABLE "Survey"
ADD CONSTRAINT "Survey_startsAt_before_endsAt"
CHECK ("startsAt" IS NULL OR "endsAt" IS NULL OR "startsAt" < "endsAt");
CREATE SCHEMA IF NOT EXISTS "river";
-- River main migration 002 [up]
CREATE TYPE river.river_job_state AS ENUM(
'available',
'cancelled',
'completed',
'discarded',
'retryable',
'running',
'scheduled'
);
CREATE TABLE river.river_job(
id bigserial PRIMARY KEY,
state river.river_job_state NOT NULL DEFAULT 'available',
attempt smallint NOT NULL DEFAULT 0,
max_attempts smallint NOT NULL,
attempted_at timestamptz,
created_at timestamptz NOT NULL DEFAULT NOW(),
finalized_at timestamptz,
scheduled_at timestamptz NOT NULL DEFAULT NOW(),
priority smallint NOT NULL DEFAULT 1,
args jsonb,
attempted_by text[],
errors jsonb[],
kind text NOT NULL,
metadata jsonb NOT NULL DEFAULT '{}',
queue text NOT NULL DEFAULT 'default',
tags varchar(255)[],
CONSTRAINT finalized_or_finalized_at_null CHECK (
(state IN ('cancelled', 'completed', 'discarded') AND finalized_at IS NOT NULL) OR finalized_at IS NULL
),
CONSTRAINT max_attempts_is_positive CHECK (max_attempts > 0),
CONSTRAINT priority_in_range CHECK (priority >= 1 AND priority <= 4),
CONSTRAINT queue_length CHECK (char_length(queue) > 0 AND char_length(queue) < 128),
CONSTRAINT kind_length CHECK (char_length(kind) > 0 AND char_length(kind) < 128)
);
CREATE INDEX river_job_kind ON river.river_job USING btree(kind);
CREATE INDEX river_job_state_and_finalized_at_index ON river.river_job USING btree(state, finalized_at)
WHERE finalized_at IS NOT NULL;
CREATE INDEX river_job_prioritized_fetching_index ON river.river_job USING btree(
state,
queue,
priority,
scheduled_at,
id
);
CREATE INDEX river_job_args_index ON river.river_job USING GIN(args);
CREATE INDEX river_job_metadata_index ON river.river_job USING GIN(metadata);
CREATE OR REPLACE FUNCTION river.river_job_notify()
RETURNS TRIGGER
AS $$
DECLARE
payload json;
BEGIN
IF NEW.state = 'available' THEN
payload = json_build_object('queue', NEW.queue);
PERFORM pg_notify('river_insert', payload::text);
END IF;
RETURN NULL;
END;
$$
LANGUAGE plpgsql;
CREATE TRIGGER river_notify
AFTER INSERT ON river.river_job
FOR EACH ROW
EXECUTE PROCEDURE river.river_job_notify();
CREATE UNLOGGED TABLE river.river_leader(
elected_at timestamptz NOT NULL,
expires_at timestamptz NOT NULL,
leader_id text NOT NULL,
name text PRIMARY KEY,
CONSTRAINT name_length CHECK (char_length(name) > 0 AND char_length(name) < 128),
CONSTRAINT leader_id_length CHECK (char_length(leader_id) > 0 AND char_length(leader_id) < 128)
);
-- River main migration 003 [up]
ALTER TABLE river.river_job ALTER COLUMN tags SET DEFAULT '{}';
UPDATE river.river_job SET tags = '{}' WHERE tags IS NULL;
ALTER TABLE river.river_job ALTER COLUMN tags SET NOT NULL;
-- River main migration 004 [up]
ALTER TABLE river.river_job ALTER COLUMN args SET DEFAULT '{}';
UPDATE river.river_job SET args = '{}' WHERE args IS NULL;
ALTER TABLE river.river_job ALTER COLUMN args SET NOT NULL;
ALTER TABLE river.river_job ALTER COLUMN args DROP DEFAULT;
ALTER TABLE river.river_job ALTER COLUMN metadata SET DEFAULT '{}';
UPDATE river.river_job SET metadata = '{}' WHERE metadata IS NULL;
ALTER TABLE river.river_job ALTER COLUMN metadata SET NOT NULL;
ALTER TYPE river.river_job_state ADD VALUE IF NOT EXISTS 'pending' AFTER 'discarded';
ALTER TABLE river.river_job DROP CONSTRAINT finalized_or_finalized_at_null;
ALTER TABLE river.river_job ADD CONSTRAINT finalized_or_finalized_at_null CHECK (
(finalized_at IS NULL AND state NOT IN ('cancelled', 'completed', 'discarded')) OR
(finalized_at IS NOT NULL AND state IN ('cancelled', 'completed', 'discarded'))
);
DROP TRIGGER river_notify ON river.river_job;
DROP FUNCTION river.river_job_notify;
CREATE TABLE river.river_queue (
name text PRIMARY KEY NOT NULL,
created_at timestamptz NOT NULL DEFAULT now(),
metadata jsonb NOT NULL DEFAULT '{}'::jsonb,
paused_at timestamptz,
updated_at timestamptz NOT NULL
);
ALTER TABLE river.river_leader
ALTER COLUMN name SET DEFAULT 'default',
DROP CONSTRAINT name_length,
ADD CONSTRAINT name_length CHECK (name = 'default');
-- River main migration 005 [up]
DO
$body$
BEGIN
IF (SELECT to_regclass('river.river_migration') IS NOT NULL) THEN
ALTER TABLE river.river_migration
RENAME TO river_migration_old;
CREATE TABLE river.river_migration(
line TEXT NOT NULL,
version bigint NOT NULL,
created_at timestamptz NOT NULL DEFAULT NOW(),
CONSTRAINT line_length CHECK (char_length(line) > 0 AND char_length(line) < 128),
CONSTRAINT version_gte_1 CHECK (version >= 1),
PRIMARY KEY (line, version)
);
INSERT INTO river.river_migration (created_at, line, version)
SELECT created_at, 'main', version
FROM river.river_migration_old;
DROP TABLE river.river_migration_old;
END IF;
END;
$body$
LANGUAGE 'plpgsql';
ALTER TABLE river.river_job
ADD COLUMN IF NOT EXISTS unique_key bytea;
CREATE UNIQUE INDEX IF NOT EXISTS river_job_kind_unique_key_idx
ON river.river_job (kind, unique_key)
WHERE unique_key IS NOT NULL;
CREATE UNLOGGED TABLE river.river_client (
id text PRIMARY KEY NOT NULL,
created_at timestamptz NOT NULL DEFAULT now(),
metadata jsonb NOT NULL DEFAULT '{}',
paused_at timestamptz,
updated_at timestamptz NOT NULL,
CONSTRAINT name_length CHECK (char_length(id) > 0 AND char_length(id) < 128)
);
CREATE UNLOGGED TABLE river.river_client_queue (
river_client_id text NOT NULL REFERENCES river.river_client (id) ON DELETE CASCADE,
name text NOT NULL,
created_at timestamptz NOT NULL DEFAULT now(),
max_workers bigint NOT NULL DEFAULT 0,
metadata jsonb NOT NULL DEFAULT '{}',
num_jobs_completed bigint NOT NULL DEFAULT 0,
num_jobs_running bigint NOT NULL DEFAULT 0,
updated_at timestamptz NOT NULL,
PRIMARY KEY (river_client_id, name),
CONSTRAINT name_length CHECK (char_length(name) > 0 AND char_length(name) < 128),
CONSTRAINT num_jobs_completed_zero_or_positive CHECK (num_jobs_completed >= 0),
CONSTRAINT num_jobs_running_zero_or_positive CHECK (num_jobs_running >= 0)
);
-- River main migration 006 [up]
CREATE OR REPLACE FUNCTION river.river_job_state_in_bitmask(bitmask BIT(8), state river.river_job_state)
RETURNS boolean
LANGUAGE SQL
IMMUTABLE
AS $$
SELECT CASE state
WHEN 'available' THEN get_bit(bitmask, 7)
WHEN 'cancelled' THEN get_bit(bitmask, 6)
WHEN 'completed' THEN get_bit(bitmask, 5)
WHEN 'discarded' THEN get_bit(bitmask, 4)
WHEN 'pending' THEN get_bit(bitmask, 3)
WHEN 'retryable' THEN get_bit(bitmask, 2)
WHEN 'running' THEN get_bit(bitmask, 1)
WHEN 'scheduled' THEN get_bit(bitmask, 0)
ELSE 0
END = 1;
$$;
ALTER TABLE river.river_job ADD COLUMN IF NOT EXISTS unique_states BIT(8);
CREATE UNIQUE INDEX IF NOT EXISTS river_job_unique_idx ON river.river_job (unique_key)
WHERE unique_key IS NOT NULL
AND unique_states IS NOT NULL
AND river.river_job_state_in_bitmask(unique_states, state);
DROP INDEX river.river_job_kind_unique_key_idx;

View File

@@ -353,6 +353,8 @@ model Survey {
creator User? @relation(fields: [createdBy], references: [id])
createdBy String?
status SurveyStatus @default(draft)
startsAt DateTime?
endsAt DateTime?
/// [SurveyWelcomeCard]
welcomeCard Json @default("{\"enabled\": false}")
/// [SurveyQuestions]

View File

@@ -53,6 +53,8 @@ const ZSurveyBase = z.object({
redirectUrl: z.url().nullable().describe("The URL to redirect to after the survey is completed"),
type: z.enum(SurveyType).describe("The type of the survey"),
status: z.enum(SurveyStatus).describe("The status of the survey"),
startsAt: z.coerce.date().nullable().optional().describe("When the survey should start"),
endsAt: z.coerce.date().nullable().optional().describe("When the survey should end"),
thankYouMessage: z.string().nullable().describe("The thank you message of the survey"),
showLanguageSwitch: z.boolean().nullable().describe("Whether to show the language switch"),
showThankYouMessage: z.boolean().nullable().describe("Whether to show the thank you message"),

View File

@@ -828,6 +828,8 @@ export const ZSurveyBase = z.object({
environmentId: z.string(),
createdBy: z.string().nullable(),
status: ZSurveyStatus,
startsAt: z.coerce.date().nullable().optional(),
endsAt: z.coerce.date().nullable().optional(),
displayOption: ZSurveyDisplayOption,
autoClose: z.number().nullable(),
triggers: z.array(z.object({ actionClass: ZActionClass })),
@@ -930,12 +932,20 @@ export const ZSurveyBase = z.object({
});
export const surveyRefinement = (survey: z.infer<typeof ZSurveyBase>, ctx: z.RefinementCtx): void => {
const { questions, blocks, languages, welcomeCard, endings, isBackButtonHidden } = survey;
const { questions, blocks, languages, welcomeCard, endings, isBackButtonHidden, startsAt, endsAt } = survey;
// Validate: must have questions OR blocks with elements, not both
const hasQuestions = questions.length > 0;
const hasBlocks = blocks.length > 0 && blocks.some((b) => b.elements.length > 0);
if (startsAt && endsAt && startsAt >= endsAt) {
ctx.addIssue({
code: "custom",
message: "Survey start date must be before end date",
path: ["startsAt"],
});
}
if (!hasQuestions && !hasBlocks) {
ctx.addIssue({
code: "custom",

View File

@@ -1,6 +1,7 @@
packages:
- "apps/*"
- "packages/*"
- "services/*"
# Allow lifecycle scripts for packages that need to build native binaries
# Required for pnpm v10+ which blocks scripts by default

View File

@@ -0,0 +1,51 @@
package main
import (
"context"
"log/slog"
"os"
"os/signal"
"syscall"
"time"
"github.com/formbricks/formbricks/services/river-poc-worker/internal/config"
"github.com/formbricks/formbricks/services/river-poc-worker/internal/riverapp"
)
func main() {
logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelInfo}))
cfg, err := config.LoadFromEnv()
if err != nil {
logger.Error("failed to load configuration", slog.Any("error", err))
os.Exit(1)
}
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer stop()
app, err := riverapp.New(ctx, cfg, logger)
if err != nil {
logger.Error("failed to initialize River application", slog.Any("error", err))
os.Exit(1)
}
if err := app.Start(ctx); err != nil {
logger.Error("failed to start River application", slog.Any("error", err))
os.Exit(1)
}
logger.Info("river-poc-worker started")
<-ctx.Done()
shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := app.Stop(shutdownCtx); err != nil {
logger.Error("failed to stop River application cleanly", slog.Any("error", err))
os.Exit(1)
}
logger.Info("river-poc-worker stopped")
}

View File

@@ -0,0 +1,38 @@
mode: atomic
github.com/formbricks/formbricks/services/river-poc-worker/internal/config/config.go:30.36,32.23 2 2
github.com/formbricks/formbricks/services/river-poc-worker/internal/config/config.go:32.23,34.3 1 1
github.com/formbricks/formbricks/services/river-poc-worker/internal/config/config.go:36.2,39.8 1 1
github.com/formbricks/formbricks/services/river-poc-worker/internal/config/config.go:42.53,44.42 2 1
github.com/formbricks/formbricks/services/river-poc-worker/internal/config/config.go:44.42,46.3 1 0
github.com/formbricks/formbricks/services/river-poc-worker/internal/config/config.go:48.2,49.51 2 1
github.com/formbricks/formbricks/services/river-poc-worker/internal/config/config.go:49.51,51.3 1 6
github.com/formbricks/formbricks/services/river-poc-worker/internal/config/config.go:53.2,54.27 2 1
github.com/formbricks/formbricks/services/river-poc-worker/internal/workers/survey_lifecycle.go:26.38,26.64 1 1
github.com/formbricks/formbricks/services/river-poc-worker/internal/workers/survey_lifecycle.go:32.36,32.60 1 1
github.com/formbricks/formbricks/services/river-poc-worker/internal/workers/survey_lifecycle.go:44.67,46.2 1 2
github.com/formbricks/formbricks/services/river-poc-worker/internal/workers/survey_lifecycle.go:48.63,50.2 1 2
github.com/formbricks/formbricks/services/river-poc-worker/internal/workers/survey_lifecycle.go:52.67,55.2 2 1
github.com/formbricks/formbricks/services/river-poc-worker/internal/workers/survey_lifecycle.go:57.94,60.2 2 1
github.com/formbricks/formbricks/services/river-poc-worker/internal/workers/survey_lifecycle.go:62.90,65.2 2 1
github.com/formbricks/formbricks/services/river-poc-worker/internal/workers/survey_lifecycle.go:73.3,82.2 1 2
github.com/formbricks/formbricks/services/river-poc-worker/cmd/river-poc-worker/main.go:15.13,19.16 3 0
github.com/formbricks/formbricks/services/river-poc-worker/cmd/river-poc-worker/main.go:19.16,22.3 2 0
github.com/formbricks/formbricks/services/river-poc-worker/cmd/river-poc-worker/main.go:24.2,28.16 4 0
github.com/formbricks/formbricks/services/river-poc-worker/cmd/river-poc-worker/main.go:28.16,31.3 2 0
github.com/formbricks/formbricks/services/river-poc-worker/cmd/river-poc-worker/main.go:33.2,33.39 1 0
github.com/formbricks/formbricks/services/river-poc-worker/cmd/river-poc-worker/main.go:33.39,36.3 2 0
github.com/formbricks/formbricks/services/river-poc-worker/cmd/river-poc-worker/main.go:38.2,45.46 5 0
github.com/formbricks/formbricks/services/river-poc-worker/cmd/river-poc-worker/main.go:45.46,48.3 2 0
github.com/formbricks/formbricks/services/river-poc-worker/cmd/river-poc-worker/main.go:50.2,50.41 1 0
github.com/formbricks/formbricks/services/river-poc-worker/internal/riverapp/app.go:22.85,24.2 1 0
github.com/formbricks/formbricks/services/river-poc-worker/internal/riverapp/app.go:31.17,33.16 2 1
github.com/formbricks/formbricks/services/river-poc-worker/internal/riverapp/app.go:33.16,35.3 1 0
github.com/formbricks/formbricks/services/river-poc-worker/internal/riverapp/app.go:37.2,51.16 4 1
github.com/formbricks/formbricks/services/river-poc-worker/internal/riverapp/app.go:51.16,54.3 2 0
github.com/formbricks/formbricks/services/river-poc-worker/internal/riverapp/app.go:56.2,59.8 1 1
github.com/formbricks/formbricks/services/river-poc-worker/internal/riverapp/app.go:62.48,63.44 1 1
github.com/formbricks/formbricks/services/river-poc-worker/internal/riverapp/app.go:63.44,65.3 1 0
github.com/formbricks/formbricks/services/river-poc-worker/internal/riverapp/app.go:67.2,67.12 1 1
github.com/formbricks/formbricks/services/river-poc-worker/internal/riverapp/app.go:70.47,73.43 2 1
github.com/formbricks/formbricks/services/river-poc-worker/internal/riverapp/app.go:73.43,75.3 1 0
github.com/formbricks/formbricks/services/river-poc-worker/internal/riverapp/app.go:77.2,77.12 1 1

View File

@@ -0,0 +1,29 @@
module github.com/formbricks/formbricks/services/river-poc-worker
go 1.25.1
require (
github.com/jackc/pgx/v5 v5.9.1
github.com/riverqueue/river v0.32.0
github.com/riverqueue/river/riverdriver/riverpgxv5 v0.32.0
)
require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
github.com/jackc/puddle/v2 v2.2.2 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/riverqueue/river/riverdriver v0.32.0 // indirect
github.com/riverqueue/river/rivershared v0.32.0 // indirect
github.com/riverqueue/river/rivertype v0.32.0 // indirect
github.com/stretchr/testify v1.11.1 // indirect
github.com/tidwall/gjson v1.18.0 // indirect
github.com/tidwall/match v1.2.0 // indirect
github.com/tidwall/pretty v1.2.1 // indirect
github.com/tidwall/sjson v1.2.5 // indirect
go.uber.org/goleak v1.3.0 // indirect
golang.org/x/sync v0.20.0 // indirect
golang.org/x/text v0.35.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

View File

@@ -0,0 +1,61 @@
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/jackc/pgerrcode v0.0.0-20240316143900-6e2875d9b438 h1:Dj0L5fhJ9F82ZJyVOmBx6msDp/kfd1t9GRfny/mfJA0=
github.com/jackc/pgerrcode v0.0.0-20240316143900-6e2875d9b438/go.mod h1:a/s9Lp5W7n/DD0VrVoyJ00FbP2ytTPDVOivvn2bMlds=
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo=
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM=
github.com/jackc/pgx/v5 v5.9.1 h1:uwrxJXBnx76nyISkhr33kQLlUqjv7et7b9FjCen/tdc=
github.com/jackc/pgx/v5 v5.9.1/go.mod h1:mal1tBGAFfLHvZzaYh77YS/eC6IX9OWbRV1QIIM0Jn4=
github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo=
github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0=
github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/riverqueue/river v0.32.0 h1:j15EoFZ4oQWXcCq8NyzWwoi3fdaO8mECTB100NSv9Qw=
github.com/riverqueue/river v0.32.0/go.mod h1:zABAdLze3HI7K02N+veikXyK5FjiLzjimnQpZ1Duyng=
github.com/riverqueue/river/riverdriver v0.32.0 h1:AG6a2hNVOIGLx/+3IRtbwofJRYEI7xqnVVxULe9s4Lg=
github.com/riverqueue/river/riverdriver v0.32.0/go.mod h1:FRDMuqnLOsakeJOHlozKK+VH7W7NLp+6EToxQ2JAjBE=
github.com/riverqueue/river/riverdriver/riverpgxv5 v0.32.0 h1:CqrRxxcdA/0sHkxLNldsQff9DIG5qxn2EJO09Pau3w0=
github.com/riverqueue/river/riverdriver/riverpgxv5 v0.32.0/go.mod h1:j45UPpbMpcI10m+huTeNUaOwzoLJcEg0K6ihWXWeOec=
github.com/riverqueue/river/rivershared v0.32.0 h1:7DwdrppMU9uoU2iU9aGQiv91nBezjlcI85NV4PmnLHw=
github.com/riverqueue/river/rivershared v0.32.0/go.mod h1:UE7GEj3zaTV3cKw7Q3angCozlNEGsL50xZBKJQ9m6zU=
github.com/riverqueue/river/rivertype v0.32.0 h1:RW7uodfl86gYkjwDponTAPNnUqM+X6BjlsNHxbt6Ztg=
github.com/riverqueue/river/rivertype v0.32.0/go.mod h1:D1Ad+EaZiaXbQbJcJcfeicXJMBKno0n6UcfKI5Q7DIQ=
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8=
github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
github.com/tidwall/gjson v1.14.2/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
github.com/tidwall/gjson v1.18.0 h1:FIDeeyB800efLX89e5a8Y0BNH+LOngJyGrIWxG2FKQY=
github.com/tidwall/gjson v1.18.0/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM=
github.com/tidwall/match v1.2.0 h1:0pt8FlkOwjN2fPt4bIl4BoNxb98gGHN2ObFEDkrfZnM=
github.com/tidwall/match v1.2.0/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM=
github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
github.com/tidwall/pretty v1.2.1 h1:qjsOFOWWQl+N3RsoF5/ssm1pHmJJwhjlSbZ51I6wMl4=
github.com/tidwall/pretty v1.2.1/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
github.com/tidwall/sjson v1.2.5 h1:kLy8mja+1c9jlljvWTlSazM7cKDRfJuR/bOJhcY5NcY=
github.com/tidwall/sjson v1.2.5/go.mod h1:Fvgq9kS/6ociJEDnK0Fk1cpYF4FIW6ZF7LAe+6jwd28=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
golang.org/x/sync v0.20.0 h1:e0PTpb7pjO8GAtTs2dQ6jYa5BWYlMuX047Dco/pItO4=
golang.org/x/sync v0.20.0/go.mod h1:9xrNwdLfx4jkKbNva9FpL6vEN7evnE43NNNJQ2LF3+0=
golang.org/x/text v0.35.0 h1:JOVx6vVDFokkpaq1AEptVzLTpDe9KGpj5tR4/X+ybL8=
golang.org/x/text v0.35.0/go.mod h1:khi/HExzZJ2pGnjenulevKNX1W67CUy0AsXcNubPGCA=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

View File

@@ -0,0 +1,55 @@
package config
import (
"errors"
"net/url"
"os"
"strings"
)
const (
DefaultRiverSchema = "river"
)
var ErrDatabaseURLRequired = errors.New("DATABASE_URL is required")
var prismaUnsupportedQueryParams = []string{
"connection_limit",
"pgbouncer",
"pool_timeout",
"schema",
"socket_timeout",
"statement_cache_size",
}
type Config struct {
DatabaseURL string
RiverSchema string
}
func LoadFromEnv() (Config, error) {
databaseURL := strings.TrimSpace(os.Getenv("DATABASE_URL"))
if databaseURL == "" {
return Config{}, ErrDatabaseURLRequired
}
return Config{
DatabaseURL: sanitizeDatabaseURL(databaseURL),
RiverSchema: DefaultRiverSchema,
}, nil
}
func sanitizeDatabaseURL(databaseURL string) string {
parsedURL, err := url.Parse(databaseURL)
if err != nil || parsedURL.Scheme == "" {
return databaseURL
}
query := parsedURL.Query()
for _, key := range prismaUnsupportedQueryParams {
query.Del(key)
}
parsedURL.RawQuery = query.Encode()
return parsedURL.String()
}

View File

@@ -0,0 +1,34 @@
package config
import "testing"
func TestLoadFromEnv(t *testing.T) {
t.Run("returns an error when DATABASE_URL is missing", func(t *testing.T) {
t.Setenv("DATABASE_URL", "")
_, err := LoadFromEnv()
if err != ErrDatabaseURLRequired {
t.Fatalf("expected ErrDatabaseURLRequired, got %v", err)
}
})
t.Run("loads database configuration from environment", func(t *testing.T) {
t.Setenv(
"DATABASE_URL",
" postgres://formbricks:password@localhost:5432/formbricks?connection_limit=5&schema=formbricks&sslmode=disable ",
)
cfg, err := LoadFromEnv()
if err != nil {
t.Fatalf("expected no error, got %v", err)
}
if got, want := cfg.DatabaseURL, "postgres://formbricks:password@localhost:5432/formbricks?sslmode=disable"; got != want {
t.Fatalf("expected DatabaseURL %q, got %q", want, got)
}
if got, want := cfg.RiverSchema, DefaultRiverSchema; got != want {
t.Fatalf("expected RiverSchema %q, got %q", want, got)
}
})
}

View File

@@ -0,0 +1,78 @@
package riverapp
import (
"context"
"fmt"
"log/slog"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/riverqueue/river"
"github.com/riverqueue/river/riverdriver/riverpgxv5"
"github.com/formbricks/formbricks/services/river-poc-worker/internal/config"
"github.com/formbricks/formbricks/services/river-poc-worker/internal/workers"
)
type App struct {
client *river.Client[pgx.Tx]
pool *pgxpool.Pool
}
func New(ctx context.Context, cfg config.Config, logger *slog.Logger) (*App, error) {
return newWithOptions(ctx, cfg, logger, false)
}
func newWithOptions(
ctx context.Context,
cfg config.Config,
logger *slog.Logger,
testOnly bool,
) (*App, error) {
pool, err := pgxpool.New(ctx, cfg.DatabaseURL)
if err != nil {
return nil, fmt.Errorf("create pgx pool: %w", err)
}
workerRegistry := river.NewWorkers()
workers.Register(workerRegistry, logger)
client, err := river.NewClient(riverpgxv5.New(pool), &river.Config{
Logger: logger,
Schema: cfg.RiverSchema,
Queues: map[string]river.QueueConfig{
workers.QueueName: {
MaxWorkers: 2,
},
},
TestOnly: testOnly,
Workers: workerRegistry,
})
if err != nil {
pool.Close()
return nil, fmt.Errorf("create river client: %w", err)
}
return &App{
client: client,
pool: pool,
}, nil
}
func (a *App) Start(ctx context.Context) error {
if err := a.client.Start(ctx); err != nil {
return fmt.Errorf("start river client: %w", err)
}
return nil
}
func (a *App) Stop(ctx context.Context) error {
defer a.pool.Close()
if err := a.client.Stop(ctx); err != nil {
return fmt.Errorf("stop river client: %w", err)
}
return nil
}

View File

@@ -0,0 +1,139 @@
package riverapp
import (
"bytes"
"context"
"fmt"
"log/slog"
"net"
"net/url"
"strings"
"testing"
"time"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/riverqueue/river/riverdriver/riverpgxv5"
"github.com/riverqueue/river/rivermigrate"
"github.com/formbricks/formbricks/services/river-poc-worker/internal/config"
"github.com/formbricks/formbricks/services/river-poc-worker/internal/workers"
)
func TestAppProcessesSurveyLifecycleJobsFromSQL(t *testing.T) {
cfg, err := config.LoadFromEnv()
if err != nil {
t.Skip("DATABASE_URL is required for integration tests")
}
if !canReachPostgres(t, cfg.DatabaseURL) {
t.Skip("Postgres is not reachable for integration tests")
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
pool, err := pgxpool.New(ctx, cfg.DatabaseURL)
if err != nil {
t.Fatalf("create pgx pool: %v", err)
}
defer pool.Close()
schema := fmt.Sprintf("river_poc_test_%d", time.Now().UnixNano())
if _, err := pool.Exec(ctx, fmt.Sprintf(`CREATE SCHEMA "%s"`, schema)); err != nil {
t.Fatalf("create schema: %v", err)
}
t.Cleanup(func() {
dropCtx, dropCancel := context.WithTimeout(context.Background(), 10*time.Second)
defer dropCancel()
_, _ = pool.Exec(dropCtx, fmt.Sprintf(`DROP SCHEMA IF EXISTS "%s" CASCADE`, schema))
})
migrator, err := rivermigrate.New(riverpgxv5.New(pool), &rivermigrate.Config{Schema: schema})
if err != nil {
t.Fatalf("create migrator: %v", err)
}
if _, err := migrator.Migrate(ctx, rivermigrate.DirectionUp, nil); err != nil {
t.Fatalf("migrate river schema: %v", err)
}
var logs bytes.Buffer
logger := slog.New(slog.NewTextHandler(&logs, &slog.HandlerOptions{Level: slog.LevelInfo}))
app, err := newWithOptions(ctx, config.Config{
DatabaseURL: cfg.DatabaseURL,
RiverSchema: schema,
}, logger, true)
if err != nil {
t.Fatalf("create app: %v", err)
}
defer func() {
stopCtx, stopCancel := context.WithTimeout(context.Background(), 10*time.Second)
defer stopCancel()
_ = app.Stop(stopCtx)
}()
if err := app.Start(ctx); err != nil {
t.Fatalf("start app: %v", err)
}
payload := `{"surveyId":"survey_1","environmentId":"env_1","scheduledFor":"2026-04-01T12:00:00.000Z"}`
insertQuery := fmt.Sprintf(`
INSERT INTO "%s"."river_job" (args, kind, max_attempts, queue)
VALUES ($1::jsonb, $2, $3, $4)
`, schema)
if _, err := pool.Exec(ctx, insertQuery, payload, workers.SurveyStartKind, 3, workers.QueueName); err != nil {
t.Fatalf("insert job: %v", err)
}
if _, err := pool.Exec(ctx, `SELECT pg_notify($1, $2)`, fmt.Sprintf(`%s.river_insert`, schema), `{"queue":"survey_lifecycle"}`); err != nil {
t.Fatalf("notify river queue: %v", err)
}
deadline := time.Now().Add(10 * time.Second)
for {
output := logs.String()
if strings.Contains(output, "STARTING SURVEY") {
if !strings.Contains(output, "job_kind="+workers.SurveyStartKind) {
t.Fatalf("expected log output to contain %q, got %q", workers.SurveyStartKind, output)
}
if !strings.Contains(output, "survey_id=survey_1") {
t.Fatalf("expected log output to contain survey_id, got %q", output)
}
if !strings.Contains(output, "environment_id=env_1") {
t.Fatalf("expected log output to contain environment_id, got %q", output)
}
if !strings.Contains(output, "scheduled_for=2026-04-01T12:00:00.000Z") {
t.Fatalf("expected log output to contain scheduled_for, got %q", output)
}
return
}
if time.Now().After(deadline) {
t.Fatalf("timed out waiting for lifecycle job to be processed; logs=%q", output)
}
time.Sleep(100 * time.Millisecond)
}
}
func canReachPostgres(t *testing.T, databaseURL string) bool {
t.Helper()
parsedURL, err := url.Parse(databaseURL)
if err != nil {
return false
}
port := parsedURL.Port()
if port == "" {
port = "5432"
}
conn, err := net.DialTimeout("tcp", net.JoinHostPort(parsedURL.Hostname(), port), 500*time.Millisecond)
if err != nil {
return false
}
defer conn.Close()
return true
}

View File

@@ -0,0 +1,82 @@
package workers
import (
"context"
"log/slog"
"github.com/riverqueue/river"
)
const (
QueueName = "survey_lifecycle"
SurveyStartKind = "survey.start"
SurveyEndKind = "survey.end"
)
type SurveyLifecyclePayload struct {
SurveyID string `json:"surveyId"`
EnvironmentID string `json:"environmentId"`
ScheduledFor string `json:"scheduledFor"`
}
type SurveyStartArgs struct {
SurveyLifecyclePayload
}
func (SurveyStartArgs) Kind() string { return SurveyStartKind }
type SurveyEndArgs struct {
SurveyLifecyclePayload
}
func (SurveyEndArgs) Kind() string { return SurveyEndKind }
type SurveyStartWorker struct {
river.WorkerDefaults[SurveyStartArgs]
logger *slog.Logger
}
type SurveyEndWorker struct {
river.WorkerDefaults[SurveyEndArgs]
logger *slog.Logger
}
func NewSurveyStartWorker(logger *slog.Logger) *SurveyStartWorker {
return &SurveyStartWorker{logger: logger}
}
func NewSurveyEndWorker(logger *slog.Logger) *SurveyEndWorker {
return &SurveyEndWorker{logger: logger}
}
func Register(workerRegistry *river.Workers, logger *slog.Logger) {
river.AddWorker(workerRegistry, NewSurveyStartWorker(logger))
river.AddWorker(workerRegistry, NewSurveyEndWorker(logger))
}
func (w *SurveyStartWorker) Work(ctx context.Context, job *river.Job[SurveyStartArgs]) error {
logLifecycle(ctx, w.logger, "STARTING SURVEY", SurveyStartKind, job.Args.SurveyLifecyclePayload)
return nil
}
func (w *SurveyEndWorker) Work(ctx context.Context, job *river.Job[SurveyEndArgs]) error {
logLifecycle(ctx, w.logger, "ENDING SURVEY", SurveyEndKind, job.Args.SurveyLifecyclePayload)
return nil
}
func logLifecycle(
ctx context.Context,
logger *slog.Logger,
message string,
kind string,
payload SurveyLifecyclePayload,
) {
logger.InfoContext(
ctx,
message,
slog.String("job_kind", kind),
slog.String("survey_id", payload.SurveyID),
slog.String("environment_id", payload.EnvironmentID),
slog.String("scheduled_for", payload.ScheduledFor),
)
}

View File

@@ -0,0 +1,107 @@
package workers
import (
"bytes"
"context"
"log/slog"
"reflect"
"strings"
"testing"
"github.com/riverqueue/river"
)
func TestRegister(t *testing.T) {
workerRegistry := river.NewWorkers()
Register(workerRegistry, slog.New(slog.NewTextHandler(&bytes.Buffer{}, nil)))
workersMap := reflect.ValueOf(workerRegistry).Elem().FieldByName("workersMap")
if !workersMap.IsValid() {
t.Fatal("expected workers registry to expose a workersMap field")
}
if !workersMap.MapIndex(reflect.ValueOf(SurveyStartKind)).IsValid() {
t.Fatalf("expected %q worker to be registered", SurveyStartKind)
}
if !workersMap.MapIndex(reflect.ValueOf(SurveyEndKind)).IsValid() {
t.Fatalf("expected %q worker to be registered", SurveyEndKind)
}
}
func TestSurveyLifecycleWorkers(t *testing.T) {
t.Parallel()
testCases := []struct {
name string
work func(context.Context, *slog.Logger) error
expectedMessage string
expectedKind string
}{
{
name: "start worker logs structured survey start event",
work: func(ctx context.Context, logger *slog.Logger) error {
return NewSurveyStartWorker(logger).Work(ctx, &river.Job[SurveyStartArgs]{
Args: SurveyStartArgs{
SurveyLifecyclePayload: SurveyLifecyclePayload{
SurveyID: "survey_start_1",
EnvironmentID: "env_1",
ScheduledFor: "2026-04-01T12:00:00.000Z",
},
},
})
},
expectedMessage: "STARTING SURVEY",
expectedKind: SurveyStartKind,
},
{
name: "end worker logs structured survey end event",
work: func(ctx context.Context, logger *slog.Logger) error {
return NewSurveyEndWorker(logger).Work(ctx, &river.Job[SurveyEndArgs]{
Args: SurveyEndArgs{
SurveyLifecyclePayload: SurveyLifecyclePayload{
SurveyID: "survey_end_1",
EnvironmentID: "env_2",
ScheduledFor: "2026-04-02T12:00:00.000Z",
},
},
})
},
expectedMessage: "ENDING SURVEY",
expectedKind: SurveyEndKind,
},
}
for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) {
var logs bytes.Buffer
logger := slog.New(slog.NewTextHandler(&logs, &slog.HandlerOptions{Level: slog.LevelInfo}))
if err := tt.work(context.Background(), logger); err != nil {
t.Fatalf("expected no error, got %v", err)
}
output := logs.String()
if !strings.Contains(output, tt.expectedMessage) {
t.Fatalf("expected log output to contain %q, got %q", tt.expectedMessage, output)
}
if !strings.Contains(output, "job_kind="+tt.expectedKind) {
t.Fatalf("expected log output to contain kind %q, got %q", tt.expectedKind, output)
}
if !strings.Contains(output, "survey_id=") {
t.Fatalf("expected log output to contain survey_id, got %q", output)
}
if !strings.Contains(output, "environment_id=") {
t.Fatalf("expected log output to contain environment_id, got %q", output)
}
if !strings.Contains(output, "scheduled_for=") {
t.Fatalf("expected log output to contain scheduled_for, got %q", output)
}
})
}
}

View File

@@ -0,0 +1,14 @@
{
"name": "@formbricks/river-poc-worker",
"version": "0.0.0",
"private": true,
"packageManager": "pnpm@10.32.1",
"scripts": {
"go": "dotenv -e ../../.env -- go run ./cmd/river-poc-worker",
"test": "dotenv -e ../../.env -- go test ./...",
"test:coverage": "dotenv -e ../../.env -- go test ./... -covermode=atomic -coverprofile=coverage.out"
},
"devDependencies": {
"dotenv-cli": "11.0.0"
}
}