fix: execute pipeline on Create Response of Management API (#6712)

Co-authored-by: pandeymangg <anshuman.pandey9999@gmail.com>
This commit is contained in:
Johannes
2025-10-27 10:34:00 -07:00
committed by GitHub
parent 33b9ee3a50
commit 906b4da33c
13 changed files with 596 additions and 20 deletions

View File

@@ -5,6 +5,7 @@ import { handleErrorResponse } from "@/app/api/v1/auth";
import { responses } from "@/app/lib/api/response";
import { transformErrorToDetails } from "@/app/lib/api/validator";
import { TApiAuditLog, TApiKeyAuthentication, withV1ApiWrapper } from "@/app/lib/api/with-api-logging";
import { sendToPipeline } from "@/app/lib/pipelines";
import { deleteResponse, getResponse } from "@/lib/response/service";
import { getSurvey } from "@/lib/survey/service";
import { hasPermission } from "@/modules/organization/settings/api-keys/lib/utils";
@@ -151,6 +152,23 @@ export const PUT = withV1ApiWrapper({
const updated = await updateResponseWithQuotaEvaluation(params.responseId, inputValidation.data);
auditLog.newObject = updated;
sendToPipeline({
event: "responseUpdated",
environmentId: result.survey.environmentId,
surveyId: result.survey.id,
response: updated,
});
if (updated.finished) {
sendToPipeline({
event: "responseFinished",
environmentId: result.survey.environmentId,
surveyId: result.survey.id,
response: updated,
});
}
return {
response: responses.successResponse(updated),
};

View File

@@ -5,6 +5,7 @@ import { TResponse, TResponseInput, ZResponseInput } from "@formbricks/types/res
import { responses } from "@/app/lib/api/response";
import { transformErrorToDetails } from "@/app/lib/api/validator";
import { TApiAuditLog, TApiKeyAuthentication, withV1ApiWrapper } from "@/app/lib/api/with-api-logging";
import { sendToPipeline } from "@/app/lib/pipelines";
import { getSurvey } from "@/lib/survey/service";
import { hasPermission } from "@/modules/organization/settings/api-keys/lib/utils";
import { validateFileUploads } from "@/modules/storage/utils";
@@ -156,6 +157,23 @@ export const POST = withV1ApiWrapper({
const response = await createResponseWithQuotaEvaluation(responseInput);
auditLog.targetId = response.id;
auditLog.newObject = response;
sendToPipeline({
event: "responseCreated",
environmentId: surveyResult.survey.environmentId,
surveyId: response.surveyId,
response: response,
});
if (response.finished) {
sendToPipeline({
event: "responseFinished",
environmentId: surveyResult.survey.environmentId,
surveyId: response.surveyId,
response: response,
});
}
return {
response: responses.successResponse(response, true),
};

View File

@@ -1,7 +1,7 @@
import { z } from "zod";
import { ZodOpenApiOperationObject } from "zod-openapi";
import { ZResponse } from "@formbricks/database/zod/responses";
import { ZResponseInput } from "@formbricks/types/responses";
import { ZResponseUpdateInput } from "@formbricks/types/responses";
import { ZResponseIdSchema } from "@/modules/api/v2/management/responses/[responseId]/types/responses";
import { makePartialSchema } from "@/modules/api/v2/types/openapi-response";
@@ -52,7 +52,8 @@ export const deleteResponseEndpoint: ZodOpenApiOperationObject = {
export const updateResponseEndpoint: ZodOpenApiOperationObject = {
operationId: "updateResponse",
summary: "Update a response",
description: "Updates a response in the database.",
description:
"Updates a response in the database. This will trigger the response pipeline, including webhooks, integrations, follow-up emails (if the response is marked as finished), and other configured actions.",
tags: ["Management API - Responses"],
requestParams: {
path: z.object({
@@ -61,10 +62,10 @@ export const updateResponseEndpoint: ZodOpenApiOperationObject = {
},
requestBody: {
required: true,
description: "The response to update",
description: "The response fields to update",
content: {
"application/json": {
schema: ZResponseInput,
schema: ZResponseUpdateInput,
},
},
},

View File

@@ -4,6 +4,7 @@ import { z } from "zod";
import { prisma } from "@formbricks/database";
import { PrismaErrorType } from "@formbricks/database/types/error";
import { Result, err, ok } from "@formbricks/types/error-handlers";
import { TResponse } from "@formbricks/types/responses";
import { deleteDisplay } from "@/modules/api/v2/management/responses/[responseId]/lib/display";
import { getSurveyQuestions } from "@/modules/api/v2/management/responses/[responseId]/lib/survey";
import { findAndDeleteUploadedFilesInResponse } from "@/modules/api/v2/management/responses/[responseId]/lib/utils";
@@ -32,6 +33,58 @@ export const getResponse = reactCache(async (responseId: string) => {
}
});
export const getResponseForPipeline = async (
responseId: string
): Promise<Result<TResponse, ApiErrorResponseV2>> => {
try {
const responsePrisma = await prisma.response.findUnique({
where: {
id: responseId,
},
include: {
contact: {
select: {
id: true,
},
},
tags: {
select: {
tag: {
select: {
id: true,
createdAt: true,
updatedAt: true,
name: true,
environmentId: true,
},
},
},
},
},
});
if (!responsePrisma) {
return err({ type: "not_found", details: [{ field: "response", issue: "not found" }] });
}
return ok({
...responsePrisma,
contact: responsePrisma.contact
? {
id: responsePrisma.contact.id,
userId: responsePrisma.contactAttributes?.userId,
}
: null,
tags: responsePrisma.tags.map((t) => t.tag),
});
} catch (error) {
return err({
type: "internal_server_error",
details: [{ field: "response", issue: error.message }],
});
}
};
export const deleteResponse = async (responseId: string): Promise<Result<Response, ApiErrorResponseV2>> => {
try {
const deletedResponse = await prisma.response.delete({

View File

@@ -7,7 +7,13 @@ import { ok, okVoid } from "@formbricks/types/error-handlers";
import { TSurveyQuota } from "@formbricks/types/quota";
import { evaluateResponseQuotas } from "@/modules/ee/quotas/lib/evaluation-service";
import { deleteDisplay } from "../display";
import { deleteResponse, getResponse, updateResponse, updateResponseWithQuotaEvaluation } from "../response";
import {
deleteResponse,
getResponse,
getResponseForPipeline,
updateResponse,
updateResponseWithQuotaEvaluation,
} from "../response";
import { getSurveyQuestions } from "../survey";
import { findAndDeleteUploadedFilesInResponse } from "../utils";
@@ -106,6 +112,177 @@ describe("Response Lib", () => {
});
});
describe("getResponseForPipeline", () => {
test("return the response with contact and tags when found", async () => {
const mockPrismaResponse = {
id: responseId,
createdAt: new Date(),
updatedAt: new Date(),
surveyId: "kbr8tnr2q2vgztyrfnqlgfjt",
displayId: "jowdit1qrf04t97jcc0io9di",
finished: true,
data: { question1: "answer1" },
meta: {},
ttc: {},
variables: {},
contactAttributes: { userId: "user123" },
singleUseId: null,
language: "en",
endingId: null,
contact: {
id: "olwablfltg9eszoh0nz83w02",
},
tags: [
{
tag: {
id: "tag123",
createdAt: new Date(),
updatedAt: new Date(),
name: "important",
environmentId: "env123",
},
},
],
};
vi.mocked(prisma.response.findUnique).mockResolvedValue(mockPrismaResponse as any);
const result = await getResponseForPipeline(responseId);
expect(result.ok).toBe(true);
if (result.ok) {
expect(result.data).toEqual({
...mockPrismaResponse,
contact: {
id: "olwablfltg9eszoh0nz83w02",
userId: "user123",
},
tags: [
{
id: "tag123",
createdAt: mockPrismaResponse.tags[0].tag.createdAt,
updatedAt: mockPrismaResponse.tags[0].tag.updatedAt,
name: "important",
environmentId: "env123",
},
],
});
}
expect(prisma.response.findUnique).toHaveBeenCalledWith({
where: { id: responseId },
include: {
contact: {
select: {
id: true,
},
},
tags: {
select: {
tag: {
select: {
id: true,
createdAt: true,
updatedAt: true,
name: true,
environmentId: true,
},
},
},
},
},
});
});
test("return the response with null contact when contact does not exist", async () => {
const mockPrismaResponseWithoutContact = {
id: responseId,
createdAt: new Date(),
updatedAt: new Date(),
surveyId: "kbr8tnr2q2vgztyrfnqlgfjt",
displayId: "jowdit1qrf04t97jcc0io9di",
finished: true,
data: { question1: "answer1" },
meta: {},
ttc: {},
variables: {},
contactAttributes: null,
singleUseId: null,
language: "en",
endingId: null,
contact: null,
tags: [],
};
vi.mocked(prisma.response.findUnique).mockResolvedValue(mockPrismaResponseWithoutContact as any);
const result = await getResponseForPipeline(responseId);
expect(result.ok).toBe(true);
if (result.ok) {
expect(result.data.contact).toBeNull();
expect(result.data.tags).toEqual([]);
}
});
test("return a not_found error when the response is missing", async () => {
vi.mocked(prisma.response.findUnique).mockResolvedValue(null);
const result = await getResponseForPipeline(responseId);
expect(result.ok).toBe(false);
if (!result.ok) {
expect(result.error).toEqual({
type: "not_found",
details: [{ field: "response", issue: "not found" }],
});
}
});
test("return an internal_server_error when prisma throws an error", async () => {
vi.mocked(prisma.response.findUnique).mockRejectedValue(new Error("DB error"));
const result = await getResponseForPipeline(responseId);
expect(result.ok).toBe(false);
if (!result.ok) {
expect(result.error).toEqual({
type: "internal_server_error",
details: [{ field: "response", issue: "DB error" }],
});
}
});
test("handle response with contact but no userId in contactAttributes", async () => {
const mockPrismaResponse = {
id: responseId,
createdAt: new Date(),
updatedAt: new Date(),
surveyId: "kbr8tnr2q2vgztyrfnqlgfjt",
displayId: null,
finished: false,
data: {},
meta: {},
ttc: {},
variables: {},
contactAttributes: {},
singleUseId: null,
language: "en",
endingId: null,
contact: {
id: "contact-id",
},
tags: [],
};
vi.mocked(prisma.response.findUnique).mockResolvedValue(mockPrismaResponse as any);
const result = await getResponseForPipeline(responseId);
expect(result.ok).toBe(true);
if (result.ok) {
expect(result.data.contact).toEqual({
id: "contact-id",
userId: undefined,
});
}
});
});
describe("deleteResponse", () => {
test("delete the response, delete the display and remove uploaded files", async () => {
vi.mocked(prisma.response.delete).mockResolvedValue(response);

View File

@@ -1,4 +1,5 @@
import { z } from "zod";
import { sendToPipeline } from "@/app/lib/pipelines";
import { authenticatedApiClient } from "@/modules/api/v2/auth/authenticated-api-client";
import { validateOtherOptionLengthForMultipleChoice } from "@/modules/api/v2/lib/question";
import { responses } from "@/modules/api/v2/lib/response";
@@ -7,6 +8,7 @@ import { getEnvironmentId } from "@/modules/api/v2/management/lib/helper";
import {
deleteResponse,
getResponse,
getResponseForPipeline,
updateResponseWithQuotaEvaluation,
} from "@/modules/api/v2/management/responses/[responseId]/lib/response";
import { getSurveyQuestions } from "@/modules/api/v2/management/responses/[responseId]/lib/survey";
@@ -124,7 +126,7 @@ export const PUT = (request: Request, props: { params: Promise<{ responseId: str
request,
{
type: "bad_request",
details: [{ field: !body ? "body" : "params", issue: "missing" }],
details: [{ field: body ? "params" : "body", issue: "missing" }],
},
auditLog
);
@@ -196,6 +198,26 @@ export const PUT = (request: Request, props: { params: Promise<{ responseId: str
return handleApiError(request, response.error as ApiErrorResponseV2, auditLog); // NOSONAR // We need to assert or we get a type error
}
// Fetch updated response with relations for pipeline
const updatedResponseForPipeline = await getResponseForPipeline(params.responseId);
if (updatedResponseForPipeline.ok) {
sendToPipeline({
event: "responseUpdated",
environmentId: environmentIdResult.data,
surveyId: existingResponse.data.surveyId,
response: updatedResponseForPipeline.data,
});
if (response.data.finished) {
sendToPipeline({
event: "responseFinished",
environmentId: environmentIdResult.data,
surveyId: existingResponse.data.surveyId,
response: updatedResponseForPipeline.data,
});
}
}
if (auditLog) {
auditLog.oldObject = existingResponse.data;
auditLog.newObject = response.data;

View File

@@ -0,0 +1,62 @@
import "server-only";
import { prisma } from "@formbricks/database";
import { TContactAttributes } from "@formbricks/types/contact-attribute";
import { Result, err, ok } from "@formbricks/types/error-handlers";
import { ApiErrorResponseV2 } from "@/modules/api/v2/types/api-error";
export const getContactByUserId = async (
environmentId: string,
userId: string
): Promise<
Result<
{
id: string;
attributes: TContactAttributes;
} | null,
ApiErrorResponseV2
>
> => {
try {
const contact = await prisma.contact.findFirst({
where: {
attributes: {
some: {
attributeKey: {
key: "userId",
environmentId,
},
value: userId,
},
},
},
select: {
id: true,
attributes: {
select: {
attributeKey: { select: { key: true } },
value: true,
},
},
},
});
if (!contact) {
return ok(null);
}
const contactAttributes = contact.attributes.reduce((acc, attr) => {
acc[attr.attributeKey.key] = attr.value;
return acc;
}, {}) as TContactAttributes;
return ok({
id: contact.id,
attributes: contactAttributes,
});
} catch (error) {
return err({
type: "internal_server_error",
details: [{ field: "contact", issue: error.message }],
});
}
};

View File

@@ -32,7 +32,8 @@ export const getResponsesEndpoint: ZodOpenApiOperationObject = {
export const createResponseEndpoint: ZodOpenApiOperationObject = {
operationId: "createResponse",
summary: "Create a response",
description: "Creates a response in the database.",
description:
"Creates a response in the database. This will trigger the response pipeline, including webhooks, integrations, follow-up emails, and other configured actions.",
tags: ["Management API - Responses"],
requestBody: {
required: true,

View File

@@ -2,11 +2,13 @@ import "server-only";
import { Prisma, Response } from "@prisma/client";
import { prisma } from "@formbricks/database";
import { logger } from "@formbricks/logger";
import { TContactAttributes } from "@formbricks/types/contact-attribute";
import { Result, err, ok } from "@formbricks/types/error-handlers";
import { IS_FORMBRICKS_CLOUD } from "@/lib/constants";
import { sendPlanLimitsReachedEventToPosthogWeekly } from "@/lib/posthogServer";
import { calculateTtcTotal } from "@/lib/response/utils";
import { captureTelemetry } from "@/lib/telemetry";
import { getContactByUserId } from "@/modules/api/v2/management/responses/lib/contact";
import {
getMonthlyOrganizationResponseCount,
getOrganizationBilling,
@@ -54,6 +56,7 @@ export const createResponse = async (
const {
surveyId,
displayId,
userId,
finished,
data,
language,
@@ -67,6 +70,17 @@ export const createResponse = async (
} = responseInput;
try {
let contact: { id: string; attributes: TContactAttributes } | null = null;
// If userId is provided, look up the contact by userId
if (userId) {
const contactResult = await getContactByUserId(environmentId, userId);
if (!contactResult.ok) {
return err(contactResult.error);
}
contact = contactResult.data;
}
let ttc = {};
if (initialTtc) {
if (finished) {
@@ -83,6 +97,14 @@ export const createResponse = async (
},
},
display: displayId ? { connect: { id: displayId } } : undefined,
...(contact?.id && {
contact: {
connect: {
id: contact.id,
},
},
contactAttributes: contact.attributes,
}),
finished,
data,
language,

View File

@@ -0,0 +1,176 @@
import { describe, expect, test, vi } from "vitest";
import { prisma } from "@formbricks/database";
import { TContactAttributes } from "@formbricks/types/contact-attribute";
import { getContactByUserId } from "../contact";
// Mock prisma
vi.mock("@formbricks/database", () => ({
prisma: {
contact: {
findFirst: vi.fn(),
},
},
}));
const environmentId = "test-env-id";
const userId = "test-user-id";
const contactId = "test-contact-id";
const mockContactDbData = {
id: contactId,
environmentId,
createdAt: new Date(),
updatedAt: new Date(),
attributes: [
{ attributeKey: { key: "userId" }, value: userId },
{ attributeKey: { key: "email" }, value: "test@example.com" },
{ attributeKey: { key: "plan" }, value: "premium" },
],
};
const expectedContactAttributes: TContactAttributes = {
userId: userId,
email: "test@example.com",
plan: "premium",
};
describe("getContactByUserId", () => {
test("should return ok result with contact and attributes when found", async () => {
vi.mocked(prisma.contact.findFirst).mockResolvedValue(mockContactDbData);
const result = await getContactByUserId(environmentId, userId);
expect(prisma.contact.findFirst).toHaveBeenCalledWith({
where: {
attributes: {
some: {
attributeKey: {
key: "userId",
environmentId,
},
value: userId,
},
},
},
select: {
id: true,
attributes: {
select: {
attributeKey: { select: { key: true } },
value: true,
},
},
},
});
expect(result.ok).toBe(true);
if (result.ok) {
expect(result.data).toEqual({
id: contactId,
attributes: expectedContactAttributes,
});
}
});
test("should return ok result with null when contact is not found", async () => {
vi.mocked(prisma.contact.findFirst).mockResolvedValue(null);
const result = await getContactByUserId(environmentId, userId);
expect(prisma.contact.findFirst).toHaveBeenCalledWith({
where: {
attributes: {
some: {
attributeKey: {
key: "userId",
environmentId,
},
value: userId,
},
},
},
select: {
id: true,
attributes: {
select: {
attributeKey: { select: { key: true } },
value: true,
},
},
},
});
expect(result.ok).toBe(true);
if (result.ok) {
expect(result.data).toBeNull();
}
});
test("should return error result when database throws an error", async () => {
const errorMessage = "Database connection failed";
vi.mocked(prisma.contact.findFirst).mockRejectedValue(new Error(errorMessage));
const result = await getContactByUserId(environmentId, userId);
expect(result.ok).toBe(false);
if (!result.ok) {
expect(result.error).toEqual({
type: "internal_server_error",
details: [{ field: "contact", issue: errorMessage }],
});
}
});
test("should correctly transform multiple attributes", async () => {
const mockContactWithManyAttributes = {
id: contactId,
environmentId,
createdAt: new Date(),
updatedAt: new Date(),
attributes: [
{ attributeKey: { key: "userId" }, value: "user123" },
{ attributeKey: { key: "email" }, value: "multi@example.com" },
{ attributeKey: { key: "firstName" }, value: "John" },
{ attributeKey: { key: "lastName" }, value: "Doe" },
{ attributeKey: { key: "company" }, value: "Acme Corp" },
],
};
vi.mocked(prisma.contact.findFirst).mockResolvedValue(mockContactWithManyAttributes);
const result = await getContactByUserId(environmentId, userId);
expect(result.ok).toBe(true);
if (result.ok) {
expect(result.data?.attributes).toEqual({
userId: "user123",
email: "multi@example.com",
firstName: "John",
lastName: "Doe",
company: "Acme Corp",
});
}
});
test("should handle contact with empty attributes array", async () => {
const mockContactWithNoAttributes = {
id: contactId,
environmentId,
createdAt: new Date(),
updatedAt: new Date(),
attributes: [],
};
vi.mocked(prisma.contact.findFirst).mockResolvedValue(mockContactWithNoAttributes);
const result = await getContactByUserId(environmentId, userId);
expect(result.ok).toBe(true);
if (result.ok) {
expect(result.data).toEqual({
id: contactId,
attributes: {},
});
}
});
});

View File

@@ -1,10 +1,12 @@
import { Response } from "@prisma/client";
import { NextRequest } from "next/server";
import { sendToPipeline } from "@/app/lib/pipelines";
import { authenticatedApiClient } from "@/modules/api/v2/auth/authenticated-api-client";
import { validateOtherOptionLengthForMultipleChoice } from "@/modules/api/v2/lib/question";
import { responses } from "@/modules/api/v2/lib/response";
import { handleApiError } from "@/modules/api/v2/lib/utils";
import { getEnvironmentId } from "@/modules/api/v2/management/lib/helper";
import { getResponseForPipeline } from "@/modules/api/v2/management/responses/[responseId]/lib/response";
import { getSurveyQuestions } from "@/modules/api/v2/management/responses/[responseId]/lib/survey";
import { ZGetResponsesFilter, ZResponseInput } from "@/modules/api/v2/management/responses/types/responses";
import { ApiErrorResponseV2 } from "@/modules/api/v2/types/api-error";
@@ -131,6 +133,26 @@ export const POST = async (request: Request) =>
return handleApiError(request, createResponseResult.error, auditLog);
}
// Fetch created response with relations for pipeline
const createdResponseForPipeline = await getResponseForPipeline(createResponseResult.data.id);
if (createdResponseForPipeline.ok) {
sendToPipeline({
event: "responseCreated",
environmentId: environmentId,
surveyId: body.surveyId,
response: createdResponseForPipeline.data,
});
if (createResponseResult.data.finished) {
sendToPipeline({
event: "responseFinished",
environmentId: environmentId,
surveyId: body.surveyId,
response: createdResponseForPipeline.data,
});
}
}
if (auditLog) {
auditLog.targetId = createResponseResult.data.id;
auditLog.newObject = createResponseResult.data;

View File

@@ -32,16 +32,20 @@ export const ZResponseInput = ZResponse.pick({
variables: true,
ttc: true,
meta: true,
}).partial({
displayId: true,
singleUseId: true,
endingId: true,
language: true,
variables: true,
ttc: true,
meta: true,
createdAt: true,
updatedAt: true,
});
})
.partial({
displayId: true,
singleUseId: true,
endingId: true,
language: true,
variables: true,
ttc: true,
meta: true,
createdAt: true,
updatedAt: true,
})
.extend({
userId: z.string().optional(),
});
export type TResponseInput = z.infer<typeof ZResponseInput>;

View File

@@ -5006,7 +5006,7 @@
"tags": ["Management API - Response"]
},
"post": {
"description": "Create a user response using the management API",
"description": "Create a user response using the management API. This will trigger the response pipeline, including webhooks, integrations, follow-up emails, and other configured actions.",
"parameters": [
{
"example": "{{apiKey}}",
@@ -5543,7 +5543,7 @@
"tags": ["Management API - Response"]
},
"put": {
"description": "Update an existing user response with new data",
"description": "Update an existing user response with new data. This will trigger the response pipeline, including webhooks, integrations, follow-up emails (if the response is marked as finished), and other configured actions.",
"parameters": [
{
"example": "{{apiKey}}",