mirror of
https://github.com/formbricks/formbricks.git
synced 2025-12-23 06:30:51 -06:00
Compare commits
30 Commits
email-issu
...
feat/bulk-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
62b4c85a10 | ||
|
|
49fbf097f8 | ||
|
|
3a40568366 | ||
|
|
f8c8b8c45d | ||
|
|
eaeaa74ba8 | ||
|
|
5f90968e61 | ||
|
|
d05a7c6d98 | ||
|
|
0c6c554cef | ||
|
|
bb3ff6829d | ||
|
|
425edf4cac | ||
|
|
8052ee0aaf | ||
|
|
2f15312d5c | ||
|
|
5196c77277 | ||
|
|
bd9efff3ff | ||
|
|
93907263a6 | ||
|
|
3ed35523be | ||
|
|
8da23c2e41 | ||
|
|
cea7139b40 | ||
|
|
d873e5b759 | ||
|
|
cda1109ffc | ||
|
|
b120de550f | ||
|
|
3f9c1c57f9 | ||
|
|
9abb07deba | ||
|
|
f665e05723 | ||
|
|
ed870ea0ce | ||
|
|
b5212e0e0e | ||
|
|
a16dcee01d | ||
|
|
af9dfe63ca | ||
|
|
e12d6a5d2d | ||
|
|
f8bd0902d2 |
@@ -1,6 +1,6 @@
|
||||
import {
|
||||
OPTIONS,
|
||||
PUT,
|
||||
} from "@/modules/ee/contacts/api/client/[environmentId]/contacts/[userId]/attributes/route";
|
||||
} from "@/modules/ee/contacts/api/v1/client/[environmentId]/contacts/[userId]/attributes/route";
|
||||
|
||||
export { OPTIONS, PUT };
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
import {
|
||||
GET,
|
||||
OPTIONS,
|
||||
} from "@/modules/ee/contacts/api/client/[environmentId]/identify/contacts/[userId]/route";
|
||||
} from "@/modules/ee/contacts/api/v1/client/[environmentId]/identify/contacts/[userId]/route";
|
||||
|
||||
export { GET, OPTIONS };
|
||||
|
||||
@@ -1,3 +1,3 @@
|
||||
import { OPTIONS, POST } from "@/modules/ee/contacts/api/client/[environmentId]/user/route";
|
||||
import { OPTIONS, POST } from "@/modules/ee/contacts/api/v1/client/[environmentId]/user/route";
|
||||
|
||||
export { POST, OPTIONS };
|
||||
|
||||
@@ -2,6 +2,6 @@ import {
|
||||
DELETE,
|
||||
GET,
|
||||
PUT,
|
||||
} from "@/modules/ee/contacts/api/management/contact-attribute-keys/[contactAttributeKeyId]/route";
|
||||
} from "@/modules/ee/contacts/api/v1/management/contact-attribute-keys/[contactAttributeKeyId]/route";
|
||||
|
||||
export { DELETE, GET, PUT };
|
||||
|
||||
@@ -1,3 +1,3 @@
|
||||
import { GET, POST } from "@/modules/ee/contacts/api/management/contact-attribute-keys/route";
|
||||
import { GET, POST } from "@/modules/ee/contacts/api/v1/management/contact-attribute-keys/route";
|
||||
|
||||
export { GET, POST };
|
||||
|
||||
@@ -1,3 +1,3 @@
|
||||
import { GET } from "@/modules/ee/contacts/api/management/contact-attributes/route";
|
||||
import { GET } from "@/modules/ee/contacts/api/v1/management/contact-attributes/route";
|
||||
|
||||
export { GET };
|
||||
|
||||
@@ -1,3 +1,3 @@
|
||||
import { DELETE, GET } from "@/modules/ee/contacts/api/management/contacts/[contactId]/route";
|
||||
import { DELETE, GET } from "@/modules/ee/contacts/api/v1/management/contacts/[contactId]/route";
|
||||
|
||||
export { DELETE, GET };
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
import { GET } from "@/modules/ee/contacts/api/management/contacts/route";
|
||||
import { GET } from "@/modules/ee/contacts/api/v1/management/contacts/route";
|
||||
|
||||
export { GET };
|
||||
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
import {
|
||||
OPTIONS,
|
||||
PUT,
|
||||
} from "@/modules/ee/contacts/api/client/[environmentId]/contacts/[userId]/attributes/route";
|
||||
} from "@/modules/ee/contacts/api/v1/client/[environmentId]/contacts/[userId]/attributes/route";
|
||||
|
||||
export { OPTIONS, PUT };
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
import {
|
||||
GET,
|
||||
OPTIONS,
|
||||
} from "@/modules/ee/contacts/api/client/[environmentId]/identify/contacts/[userId]/route";
|
||||
} from "@/modules/ee/contacts/api/v1/client/[environmentId]/identify/contacts/[userId]/route";
|
||||
|
||||
export { GET, OPTIONS };
|
||||
|
||||
@@ -1,3 +1,3 @@
|
||||
import { OPTIONS, POST } from "@/modules/ee/contacts/api/client/[environmentId]/user/route";
|
||||
import { OPTIONS, POST } from "@/modules/ee/contacts/api/v1/client/[environmentId]/user/route";
|
||||
|
||||
export { POST, OPTIONS };
|
||||
|
||||
3
apps/web/app/api/v2/management/contacts/bulk/route.ts
Normal file
3
apps/web/app/api/v2/management/contacts/bulk/route.ts
Normal file
@@ -0,0 +1,3 @@
|
||||
import { PUT } from "@/modules/ee/contacts/api/v2/management/contacts/bulk/route";
|
||||
|
||||
export { PUT };
|
||||
@@ -39,7 +39,12 @@ const enforceHttps = (request: NextRequest): Response | null => {
|
||||
if (IS_PRODUCTION && !E2E_TESTING && forwardedProto !== "https") {
|
||||
const apiError: ApiErrorResponseV2 = {
|
||||
type: "forbidden",
|
||||
details: [{ field: "", issue: "Only HTTPS connections are allowed on the management endpoint." }],
|
||||
details: [
|
||||
{
|
||||
field: "",
|
||||
issue: "Only HTTPS connections are allowed on the management and contacts bulk endpoints.",
|
||||
},
|
||||
],
|
||||
};
|
||||
logApiError(request, apiError);
|
||||
return NextResponse.json(apiError, { status: 403 });
|
||||
|
||||
@@ -257,6 +257,34 @@ const successResponse = ({
|
||||
);
|
||||
};
|
||||
|
||||
export const multiStatusResponse = ({
|
||||
data,
|
||||
meta,
|
||||
cors = false,
|
||||
cache = "private, no-store",
|
||||
}: {
|
||||
data: Object;
|
||||
meta?: Record<string, unknown>;
|
||||
cors?: boolean;
|
||||
cache?: string;
|
||||
}) => {
|
||||
const headers = {
|
||||
...(cors && corsHeaders),
|
||||
"Cache-Control": cache,
|
||||
};
|
||||
|
||||
return Response.json(
|
||||
{
|
||||
data,
|
||||
meta,
|
||||
} as ApiSuccessResponse,
|
||||
{
|
||||
status: 207,
|
||||
headers,
|
||||
}
|
||||
);
|
||||
};
|
||||
|
||||
export const responses = {
|
||||
badRequestResponse,
|
||||
unauthorizedResponse,
|
||||
@@ -267,4 +295,5 @@ export const responses = {
|
||||
tooManyRequestsResponse,
|
||||
internalServerErrorResponse,
|
||||
successResponse,
|
||||
multiStatusResponse,
|
||||
};
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
import { responses } from "@/modules/api/v2/lib/response";
|
||||
import { ApiErrorResponseV2 } from "@/modules/api/v2/types/api-error";
|
||||
import { ZodError } from "zod";
|
||||
import { ZodCustomIssue, ZodIssue } from "zod";
|
||||
import { logger } from "@formbricks/logger";
|
||||
|
||||
export const handleApiError = (request: Request, err: ApiErrorResponseV2): Response => {
|
||||
@@ -34,11 +34,16 @@ export const handleApiError = (request: Request, err: ApiErrorResponseV2): Respo
|
||||
}
|
||||
};
|
||||
|
||||
export const formatZodError = (error: ZodError) => {
|
||||
return error.issues.map((issue) => ({
|
||||
field: issue.path.join("."),
|
||||
issue: issue.message,
|
||||
}));
|
||||
export const formatZodError = (error: { issues: (ZodIssue | ZodCustomIssue)[] }) => {
|
||||
return error.issues.map((issue) => {
|
||||
const issueParams = issue.code === "custom" ? issue.params : undefined;
|
||||
|
||||
return {
|
||||
field: issue.path.join("."),
|
||||
issue: issue.message ?? "An error occurred while processing your request. Please try again later.",
|
||||
...(issueParams && { meta: issueParams }),
|
||||
};
|
||||
});
|
||||
};
|
||||
|
||||
export const logApiRequest = (request: Request, responseStatus: number): void => {
|
||||
|
||||
@@ -2,7 +2,6 @@ import { checkRateLimitAndThrowError } from "@/modules/api/v2/lib/rate-limit";
|
||||
import { formatZodError, handleApiError } from "@/modules/api/v2/lib/utils";
|
||||
import { ZodRawShape, z } from "zod";
|
||||
import { TAuthenticationApiKey } from "@formbricks/types/auth";
|
||||
import { err } from "@formbricks/types/error-handlers";
|
||||
import { authenticateRequest } from "./authenticate-request";
|
||||
|
||||
export type HandlerFn<TInput = Record<string, unknown>> = ({
|
||||
@@ -41,65 +40,63 @@ export const apiWrapper = async <S extends ExtendedSchemas>({
|
||||
rateLimit?: boolean;
|
||||
handler: HandlerFn<ParsedSchemas<S>>;
|
||||
}): Promise<Response> => {
|
||||
try {
|
||||
const authentication = await authenticateRequest(request);
|
||||
if (!authentication.ok) return handleApiError(request, authentication.error);
|
||||
|
||||
let parsedInput: ParsedSchemas<S> = {} as ParsedSchemas<S>;
|
||||
|
||||
if (schemas?.body) {
|
||||
const bodyData = await request.json();
|
||||
const bodyResult = schemas.body.safeParse(bodyData);
|
||||
|
||||
if (!bodyResult.success) {
|
||||
throw err({
|
||||
type: "bad_request",
|
||||
details: formatZodError(bodyResult.error),
|
||||
});
|
||||
}
|
||||
parsedInput.body = bodyResult.data as ParsedSchemas<S>["body"];
|
||||
}
|
||||
|
||||
if (schemas?.query) {
|
||||
const url = new URL(request.url);
|
||||
const queryObject = Object.fromEntries(url.searchParams.entries());
|
||||
const queryResult = schemas.query.safeParse(queryObject);
|
||||
if (!queryResult.success) {
|
||||
throw err({
|
||||
type: "unprocessable_entity",
|
||||
details: formatZodError(queryResult.error),
|
||||
});
|
||||
}
|
||||
parsedInput.query = queryResult.data as ParsedSchemas<S>["query"];
|
||||
}
|
||||
|
||||
if (schemas?.params) {
|
||||
const paramsObject = (await externalParams) || {};
|
||||
const paramsResult = schemas.params.safeParse(paramsObject);
|
||||
if (!paramsResult.success) {
|
||||
throw err({
|
||||
type: "unprocessable_entity",
|
||||
details: formatZodError(paramsResult.error),
|
||||
});
|
||||
}
|
||||
parsedInput.params = paramsResult.data as ParsedSchemas<S>["params"];
|
||||
}
|
||||
|
||||
if (rateLimit) {
|
||||
const rateLimitResponse = await checkRateLimitAndThrowError({
|
||||
identifier: authentication.data.hashedApiKey,
|
||||
});
|
||||
if (!rateLimitResponse.ok) {
|
||||
throw rateLimitResponse.error;
|
||||
}
|
||||
}
|
||||
|
||||
return handler({
|
||||
authentication: authentication.data,
|
||||
parsedInput,
|
||||
request,
|
||||
});
|
||||
} catch (err) {
|
||||
return handleApiError(request, err.error);
|
||||
const authentication = await authenticateRequest(request);
|
||||
if (!authentication.ok) {
|
||||
return handleApiError(request, authentication.error);
|
||||
}
|
||||
|
||||
let parsedInput: ParsedSchemas<S> = {} as ParsedSchemas<S>;
|
||||
|
||||
if (schemas?.body) {
|
||||
const bodyData = await request.json();
|
||||
const bodyResult = schemas.body.safeParse(bodyData);
|
||||
|
||||
if (!bodyResult.success) {
|
||||
return handleApiError(request, {
|
||||
type: "unprocessable_entity",
|
||||
details: formatZodError(bodyResult.error),
|
||||
});
|
||||
}
|
||||
parsedInput.body = bodyResult.data as ParsedSchemas<S>["body"];
|
||||
}
|
||||
|
||||
if (schemas?.query) {
|
||||
const url = new URL(request.url);
|
||||
const queryObject = Object.fromEntries(url.searchParams.entries());
|
||||
const queryResult = schemas.query.safeParse(queryObject);
|
||||
if (!queryResult.success) {
|
||||
return handleApiError(request, {
|
||||
type: "unprocessable_entity",
|
||||
details: formatZodError(queryResult.error),
|
||||
});
|
||||
}
|
||||
parsedInput.query = queryResult.data as ParsedSchemas<S>["query"];
|
||||
}
|
||||
|
||||
if (schemas?.params) {
|
||||
const paramsObject = (await externalParams) || {};
|
||||
const paramsResult = schemas.params.safeParse(paramsObject);
|
||||
if (!paramsResult.success) {
|
||||
return handleApiError(request, {
|
||||
type: "unprocessable_entity",
|
||||
details: formatZodError(paramsResult.error),
|
||||
});
|
||||
}
|
||||
parsedInput.params = paramsResult.data as ParsedSchemas<S>["params"];
|
||||
}
|
||||
|
||||
if (rateLimit) {
|
||||
const rateLimitResponse = await checkRateLimitAndThrowError({
|
||||
identifier: authentication.data.hashedApiKey,
|
||||
});
|
||||
if (!rateLimitResponse.ok) {
|
||||
return handleApiError(request, rateLimitResponse.error);
|
||||
}
|
||||
}
|
||||
|
||||
return handler({
|
||||
authentication: authentication.data,
|
||||
parsedInput,
|
||||
request,
|
||||
});
|
||||
};
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import { logApiRequest } from "@/modules/api/v2/lib/utils";
|
||||
import { handleApiError, logApiRequest } from "@/modules/api/v2/lib/utils";
|
||||
import { ApiErrorResponseV2 } from "@/modules/api/v2/types/api-error";
|
||||
import { ExtendedSchemas, HandlerFn, ParsedSchemas, apiWrapper } from "./api-wrapper";
|
||||
|
||||
export const authenticatedApiClient = async <S extends ExtendedSchemas>({
|
||||
@@ -14,16 +15,28 @@ export const authenticatedApiClient = async <S extends ExtendedSchemas>({
|
||||
rateLimit?: boolean;
|
||||
handler: HandlerFn<ParsedSchemas<S>>;
|
||||
}): Promise<Response> => {
|
||||
const response = await apiWrapper({
|
||||
request,
|
||||
schemas,
|
||||
externalParams,
|
||||
rateLimit,
|
||||
handler,
|
||||
});
|
||||
if (response.ok) {
|
||||
logApiRequest(request, response.status);
|
||||
}
|
||||
try {
|
||||
const response = await apiWrapper({
|
||||
request,
|
||||
schemas,
|
||||
externalParams,
|
||||
rateLimit,
|
||||
handler,
|
||||
});
|
||||
|
||||
return response;
|
||||
if (response.ok) {
|
||||
logApiRequest(request, response.status);
|
||||
}
|
||||
|
||||
return response;
|
||||
} catch (err) {
|
||||
if ("type" in err) {
|
||||
return handleApiError(request, err as ApiErrorResponseV2);
|
||||
}
|
||||
|
||||
return handleApiError(request, {
|
||||
type: "internal_server_error",
|
||||
details: [{ field: "error", issue: "An error occurred while processing your request." }],
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
@@ -19,6 +19,11 @@ vi.mock("@/modules/api/v2/lib/utils", () => ({
|
||||
handleApiError: vi.fn(),
|
||||
}));
|
||||
|
||||
vi.mock("@/modules/api/v2/lib/utils", () => ({
|
||||
formatZodError: vi.fn(),
|
||||
handleApiError: vi.fn(),
|
||||
}));
|
||||
|
||||
describe("apiWrapper", () => {
|
||||
it("should handle request and return response", async () => {
|
||||
const request = new Request("http://localhost", {
|
||||
|
||||
@@ -5,6 +5,7 @@ import { responsePaths } from "@/modules/api/v2/management/responses/lib/openapi
|
||||
import { rolePaths } from "@/modules/api/v2/management/roles/lib/openapi";
|
||||
import { surveyPaths } from "@/modules/api/v2/management/surveys/lib/openapi";
|
||||
import { webhookPaths } from "@/modules/api/v2/management/webhooks/lib/openapi";
|
||||
import { bulkContactPaths } from "@/modules/ee/contacts/api/v2/management/contacts/bulk/lib/openapi";
|
||||
import * as yaml from "yaml";
|
||||
import { z } from "zod";
|
||||
import { createDocument, extendZodWithOpenApi } from "zod-openapi";
|
||||
@@ -26,6 +27,7 @@ const document = createDocument({
|
||||
},
|
||||
paths: {
|
||||
...responsePaths,
|
||||
...bulkContactPaths,
|
||||
...contactPaths,
|
||||
...contactAttributePaths,
|
||||
...contactAttributeKeyPaths,
|
||||
|
||||
@@ -1,4 +1,12 @@
|
||||
export type ApiErrorDetails = { field: string; issue: string }[];
|
||||
// We're naming the "params" field from zod (or otherwise) to "meta" since "params" is a bit confusing
|
||||
// We're still using the "params" type from zod though because it allows us to not reference `any` and directly use the zod types
|
||||
export type ApiErrorDetails = {
|
||||
field: string;
|
||||
issue: string;
|
||||
meta?: {
|
||||
[k: string]: unknown;
|
||||
};
|
||||
}[];
|
||||
|
||||
export type ApiErrorResponseV2 =
|
||||
| {
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
import { contactCache } from "@/lib/cache/contact";
|
||||
import { contactAttributeCache } from "@/lib/cache/contact-attribute";
|
||||
import { getContactByUserId } from "@/modules/ee/contacts/api/client/[environmentId]/identify/contacts/[userId]/lib/contact";
|
||||
import { getContactByUserId } from "@/modules/ee/contacts/api/v1/client/[environmentId]/identify/contacts/[userId]/lib/contact";
|
||||
import { prisma } from "@formbricks/database";
|
||||
import { cache } from "@formbricks/lib/cache";
|
||||
import { segmentCache } from "@formbricks/lib/cache/segment";
|
||||
@@ -1,5 +1,5 @@
|
||||
import { contactAttributeCache } from "@/lib/cache/contact-attribute";
|
||||
import { getContactAttributes } from "@/modules/ee/contacts/api/client/[environmentId]/identify/contacts/[userId]/lib/attributes";
|
||||
import { getContactAttributes } from "@/modules/ee/contacts/api/v1/client/[environmentId]/identify/contacts/[userId]/lib/attributes";
|
||||
import { evaluateSegment } from "@/modules/ee/contacts/segments/lib/segments";
|
||||
import { Prisma } from "@prisma/client";
|
||||
import { cache as reactCache } from "react";
|
||||
@@ -0,0 +1,401 @@
|
||||
import { contactCache } from "@/lib/cache/contact";
|
||||
import { contactAttributeCache } from "@/lib/cache/contact-attribute";
|
||||
import { contactAttributeKeyCache } from "@/lib/cache/contact-attribute-key";
|
||||
import { ApiErrorResponseV2 } from "@/modules/api/v2/types/api-error";
|
||||
import { TContactBulkUploadContact } from "@/modules/ee/contacts/types/contact";
|
||||
import { createId } from "@paralleldrive/cuid2";
|
||||
import { Prisma } from "@prisma/client";
|
||||
import { prisma } from "@formbricks/database";
|
||||
import { logger } from "@formbricks/logger";
|
||||
import { Result, err, ok } from "@formbricks/types/error-handlers";
|
||||
|
||||
export const upsertBulkContacts = async (
|
||||
contacts: TContactBulkUploadContact[],
|
||||
environmentId: string,
|
||||
parsedEmails: string[]
|
||||
): Promise<
|
||||
Result<
|
||||
{
|
||||
contactIdxWithConflictingUserIds: number[];
|
||||
},
|
||||
ApiErrorResponseV2
|
||||
>
|
||||
> => {
|
||||
const emailAttributeKey = "email";
|
||||
const contactIdxWithConflictingUserIds: number[] = [];
|
||||
|
||||
let userIdsInContacts: string[] = [];
|
||||
let attributeKeysSet: Set<string> = new Set();
|
||||
let attributeKeys: string[] = [];
|
||||
|
||||
// both can be done with a single loop:
|
||||
contacts.forEach((contact) => {
|
||||
contact.attributes.forEach((attr) => {
|
||||
if (attr.attributeKey.key === "userId") {
|
||||
userIdsInContacts.push(attr.value);
|
||||
}
|
||||
|
||||
if (!attributeKeysSet.has(attr.attributeKey.key)) {
|
||||
attributeKeys.push(attr.attributeKey.key);
|
||||
}
|
||||
|
||||
// Add the attribute key to the set
|
||||
attributeKeysSet.add(attr.attributeKey.key);
|
||||
});
|
||||
});
|
||||
|
||||
const [existingUserIds, existingContactsByEmail, existingAttributeKeys] = await Promise.all([
|
||||
prisma.contactAttribute.findMany({
|
||||
where: {
|
||||
attributeKey: {
|
||||
environmentId,
|
||||
key: "userId",
|
||||
},
|
||||
value: {
|
||||
in: userIdsInContacts,
|
||||
},
|
||||
},
|
||||
select: {
|
||||
value: true,
|
||||
},
|
||||
}),
|
||||
|
||||
prisma.contact.findMany({
|
||||
where: {
|
||||
environmentId,
|
||||
attributes: {
|
||||
some: {
|
||||
attributeKey: { key: emailAttributeKey },
|
||||
value: { in: parsedEmails },
|
||||
},
|
||||
},
|
||||
},
|
||||
select: {
|
||||
attributes: {
|
||||
select: {
|
||||
attributeKey: { select: { key: true } },
|
||||
createdAt: true,
|
||||
id: true,
|
||||
value: true,
|
||||
},
|
||||
},
|
||||
id: true,
|
||||
},
|
||||
}),
|
||||
|
||||
prisma.contactAttributeKey.findMany({
|
||||
where: {
|
||||
key: { in: attributeKeys },
|
||||
environmentId,
|
||||
},
|
||||
}),
|
||||
]);
|
||||
|
||||
// Build a map from email to contact id (if the email attribute exists)
|
||||
const contactMap = new Map<
|
||||
string,
|
||||
{
|
||||
contactId: string;
|
||||
attributes: { id: string; attributeKey: { key: string }; createdAt: Date; value: string }[];
|
||||
}
|
||||
>();
|
||||
|
||||
existingContactsByEmail.forEach((contact) => {
|
||||
const emailAttr = contact.attributes.find((attr) => attr.attributeKey.key === emailAttributeKey);
|
||||
|
||||
if (emailAttr) {
|
||||
contactMap.set(emailAttr.value, {
|
||||
contactId: contact.id,
|
||||
attributes: contact.attributes.map((attr) => ({
|
||||
id: attr.id,
|
||||
attributeKey: { key: attr.attributeKey.key },
|
||||
createdAt: attr.createdAt,
|
||||
value: attr.value,
|
||||
})),
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
// Split contacts into ones to update and ones to create
|
||||
const contactsToUpdate: {
|
||||
contactId: string;
|
||||
attributes: {
|
||||
id: string;
|
||||
createdAt: Date;
|
||||
value: string;
|
||||
attributeKey: {
|
||||
key: string;
|
||||
};
|
||||
}[];
|
||||
}[] = [];
|
||||
|
||||
const contactsToCreate: {
|
||||
attributes: {
|
||||
value: string;
|
||||
attributeKey: {
|
||||
key: string;
|
||||
};
|
||||
}[];
|
||||
}[] = [];
|
||||
|
||||
let filteredContacts: TContactBulkUploadContact[] = [];
|
||||
|
||||
contacts.forEach((contact, idx) => {
|
||||
const emailAttr = contact.attributes.find((attr) => attr.attributeKey.key === emailAttributeKey);
|
||||
|
||||
if (emailAttr && contactMap.has(emailAttr.value)) {
|
||||
// if all the attributes passed are the same as the existing attributes, skip the update:
|
||||
const existingContact = contactMap.get(emailAttr.value);
|
||||
if (existingContact) {
|
||||
// Create maps of existing attributes by key
|
||||
const existingAttributesByKey = new Map(
|
||||
existingContact.attributes.map((attr) => [attr.attributeKey.key, attr.value])
|
||||
);
|
||||
|
||||
// Determine which attributes need updating by comparing values.
|
||||
const attributesToUpdate = contact.attributes.filter(
|
||||
(attr) => existingAttributesByKey.get(attr.attributeKey.key) !== attr.value
|
||||
);
|
||||
|
||||
// Check if any attributes need updating
|
||||
const needsUpdate = attributesToUpdate.length > 0;
|
||||
|
||||
if (!needsUpdate) {
|
||||
filteredContacts.push(contact);
|
||||
// No attributes need to be updated
|
||||
return;
|
||||
}
|
||||
|
||||
// if the attributes to update have a userId that exists in the db, we need to skip the update
|
||||
const userIdAttr = attributesToUpdate.find((attr) => attr.attributeKey.key === "userId");
|
||||
|
||||
if (userIdAttr) {
|
||||
const existingUserId = existingUserIds.find(
|
||||
(existingUserId) => existingUserId.value === userIdAttr.value
|
||||
);
|
||||
|
||||
if (existingUserId) {
|
||||
contactIdxWithConflictingUserIds.push(idx);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
filteredContacts.push(contact);
|
||||
contactsToUpdate.push({
|
||||
contactId: existingContact.contactId,
|
||||
attributes: attributesToUpdate.map((attr) => {
|
||||
const existingAttr = existingContact.attributes.find(
|
||||
(a) => a.attributeKey.key === attr.attributeKey.key
|
||||
);
|
||||
|
||||
if (!existingAttr) {
|
||||
return {
|
||||
id: createId(),
|
||||
createdAt: new Date(),
|
||||
value: attr.value,
|
||||
attributeKey: attr.attributeKey,
|
||||
};
|
||||
}
|
||||
|
||||
return {
|
||||
id: existingAttr.id,
|
||||
createdAt: existingAttr.createdAt,
|
||||
value: attr.value,
|
||||
attributeKey: attr.attributeKey,
|
||||
};
|
||||
}),
|
||||
});
|
||||
}
|
||||
} else {
|
||||
// There can't be a case where the emailAttr is not defined since that should be caught by zod.
|
||||
|
||||
// if the contact has a userId that already exists in the db, we need to skip the create
|
||||
const userIdAttr = contact.attributes.find((attr) => attr.attributeKey.key === "userId");
|
||||
if (userIdAttr) {
|
||||
const existingUserId = existingUserIds.find(
|
||||
(existingUserId) => existingUserId.value === userIdAttr.value
|
||||
);
|
||||
|
||||
if (existingUserId) {
|
||||
contactIdxWithConflictingUserIds.push(idx);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
filteredContacts.push(contact);
|
||||
contactsToCreate.push(contact);
|
||||
}
|
||||
});
|
||||
|
||||
try {
|
||||
// Execute everything in ONE transaction
|
||||
await prisma.$transaction(
|
||||
async (tx) => {
|
||||
const attributeKeyMap = existingAttributeKeys.reduce<Record<string, string>>((acc, keyObj) => {
|
||||
acc[keyObj.key] = keyObj.id;
|
||||
return acc;
|
||||
}, {});
|
||||
|
||||
// Check for missing attribute keys and create them if needed.
|
||||
const missingKeysMap = new Map<string, { key: string; name: string }>();
|
||||
const attributeKeyNameUpdates = new Map<string, { key: string; name: string }>();
|
||||
|
||||
for (const contact of filteredContacts) {
|
||||
for (const attr of contact.attributes) {
|
||||
if (!attributeKeyMap[attr.attributeKey.key]) {
|
||||
missingKeysMap.set(attr.attributeKey.key, attr.attributeKey);
|
||||
} else {
|
||||
// Check if the name has changed for existing attribute keys
|
||||
const existingKey = existingAttributeKeys.find((ak) => ak.key === attr.attributeKey.key);
|
||||
if (existingKey && existingKey.name !== attr.attributeKey.name) {
|
||||
attributeKeyNameUpdates.set(attr.attributeKey.key, attr.attributeKey);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Handle both missing keys and name updates in a single batch operation
|
||||
const keysToUpsert = new Map<string, { key: string; name: string }>();
|
||||
|
||||
// Collect all keys that need to be created or updated
|
||||
for (const [key, value] of missingKeysMap) {
|
||||
keysToUpsert.set(key, value);
|
||||
}
|
||||
|
||||
for (const [key, value] of attributeKeyNameUpdates) {
|
||||
keysToUpsert.set(key, value);
|
||||
}
|
||||
|
||||
if (keysToUpsert.size > 0) {
|
||||
const keysArray = Array.from(keysToUpsert.values());
|
||||
const BATCH_SIZE = 10000;
|
||||
|
||||
for (let i = 0; i < keysArray.length; i += BATCH_SIZE) {
|
||||
const batch = keysArray.slice(i, i + BATCH_SIZE);
|
||||
|
||||
// Use raw query to perform upsert
|
||||
const upsertedKeys = await tx.$queryRaw<{ id: string; key: string }[]>`
|
||||
INSERT INTO "ContactAttributeKey" ("id", "key", "name", "environmentId", "created_at", "updated_at")
|
||||
SELECT
|
||||
unnest(${Prisma.sql`ARRAY[${batch.map(() => createId())}]`}),
|
||||
unnest(${Prisma.sql`ARRAY[${batch.map((k) => k.key)}]`}),
|
||||
unnest(${Prisma.sql`ARRAY[${batch.map((k) => k.name)}]`}),
|
||||
${environmentId},
|
||||
NOW(),
|
||||
NOW()
|
||||
ON CONFLICT ("key", "environmentId")
|
||||
DO UPDATE SET
|
||||
"name" = EXCLUDED."name",
|
||||
"updated_at" = NOW()
|
||||
RETURNING "id", "key"
|
||||
`;
|
||||
|
||||
// Update attribute key map with upserted keys
|
||||
for (const key of upsertedKeys) {
|
||||
attributeKeyMap[key.key] = key.id;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Create new contacts -- should be at most 1000, no need to batch
|
||||
const newContacts = contactsToCreate.map(() => ({
|
||||
id: createId(),
|
||||
environmentId,
|
||||
}));
|
||||
|
||||
if (newContacts.length > 0) {
|
||||
await tx.contact.createMany({
|
||||
data: newContacts,
|
||||
});
|
||||
}
|
||||
|
||||
// Prepare attributes for both new and existing contacts
|
||||
const attributesUpsertForCreatedUsers = contactsToCreate.flatMap((contact, idx) =>
|
||||
contact.attributes.map((attr) => ({
|
||||
id: createId(),
|
||||
contactId: newContacts[idx].id,
|
||||
attributeKeyId: attributeKeyMap[attr.attributeKey.key],
|
||||
value: attr.value,
|
||||
createdAt: new Date(),
|
||||
updatedAt: new Date(),
|
||||
}))
|
||||
);
|
||||
|
||||
const attributesUpsertForExistingUsers = contactsToUpdate.flatMap((contact) =>
|
||||
contact.attributes.map((attr) => ({
|
||||
id: attr.id,
|
||||
contactId: contact.contactId,
|
||||
attributeKeyId: attributeKeyMap[attr.attributeKey.key],
|
||||
value: attr.value,
|
||||
createdAt: attr.createdAt,
|
||||
updatedAt: new Date(),
|
||||
}))
|
||||
);
|
||||
|
||||
const attributesToUpsert = [...attributesUpsertForCreatedUsers, ...attributesUpsertForExistingUsers];
|
||||
|
||||
// Skip the raw query if there are no attributes to upsert
|
||||
if (attributesToUpsert.length > 0) {
|
||||
// Process attributes in batches of 10,000
|
||||
const BATCH_SIZE = 10000;
|
||||
for (let i = 0; i < attributesToUpsert.length; i += BATCH_SIZE) {
|
||||
const batch = attributesToUpsert.slice(i, i + BATCH_SIZE);
|
||||
|
||||
// Use a raw query to perform a bulk insert with an ON CONFLICT clause
|
||||
await tx.$executeRaw`
|
||||
INSERT INTO "ContactAttribute" (
|
||||
"id", "created_at", "updated_at", "contactId", "value", "attributeKeyId"
|
||||
)
|
||||
SELECT
|
||||
unnest(${Prisma.sql`ARRAY[${batch.map((a) => a.id)}]`}),
|
||||
unnest(${Prisma.sql`ARRAY[${batch.map((a) => a.createdAt)}]`}),
|
||||
unnest(${Prisma.sql`ARRAY[${batch.map((a) => a.updatedAt)}]`}),
|
||||
unnest(${Prisma.sql`ARRAY[${batch.map((a) => a.contactId)}]`}),
|
||||
unnest(${Prisma.sql`ARRAY[${batch.map((a) => a.value)}]`}),
|
||||
unnest(${Prisma.sql`ARRAY[${batch.map((a) => a.attributeKeyId)}]`})
|
||||
ON CONFLICT ("contactId", "attributeKeyId") DO UPDATE SET
|
||||
"value" = EXCLUDED."value",
|
||||
"updated_at" = EXCLUDED."updated_at"
|
||||
`;
|
||||
}
|
||||
}
|
||||
|
||||
contactCache.revalidate({
|
||||
environmentId,
|
||||
});
|
||||
|
||||
// revalidate all the new contacts:
|
||||
for (const newContact of newContacts) {
|
||||
contactCache.revalidate({
|
||||
id: newContact.id,
|
||||
});
|
||||
}
|
||||
|
||||
// revalidate all the existing contacts:
|
||||
for (const existingContact of existingContactsByEmail) {
|
||||
contactCache.revalidate({
|
||||
id: existingContact.id,
|
||||
});
|
||||
}
|
||||
|
||||
contactAttributeKeyCache.revalidate({
|
||||
environmentId,
|
||||
});
|
||||
|
||||
contactAttributeCache.revalidate({ environmentId });
|
||||
},
|
||||
{ timeout: 60 * 1000 }
|
||||
);
|
||||
|
||||
return ok({
|
||||
contactIdxWithConflictingUserIds,
|
||||
});
|
||||
} catch (error) {
|
||||
logger.error({ error }, "Failed to upsert contacts");
|
||||
|
||||
return err({
|
||||
type: "internal_server_error",
|
||||
details: [{ field: "error", issue: "Failed to upsert contacts" }],
|
||||
});
|
||||
}
|
||||
};
|
||||
@@ -0,0 +1,59 @@
|
||||
import { ZContactBulkUploadRequest } from "@/modules/ee/contacts/types/contact";
|
||||
import { z } from "zod";
|
||||
import { ZodOpenApiOperationObject, ZodOpenApiPathsObject } from "zod-openapi";
|
||||
|
||||
const bulkContactEndpoint: ZodOpenApiOperationObject = {
|
||||
operationId: "uploadBulkContacts",
|
||||
summary: "Upload Bulk Contacts",
|
||||
description: "Uploads contacts in bulk",
|
||||
requestBody: {
|
||||
required: true,
|
||||
description: "The contacts to upload",
|
||||
content: {
|
||||
"application/json": {
|
||||
schema: ZContactBulkUploadRequest,
|
||||
},
|
||||
},
|
||||
},
|
||||
tags: ["Management API > Contacts"],
|
||||
responses: {
|
||||
"200": {
|
||||
description: "Contacts uploaded successfully.",
|
||||
content: {
|
||||
"application/json": {
|
||||
schema: z.object({
|
||||
data: z.object({
|
||||
status: z.string(),
|
||||
message: z.string(),
|
||||
}),
|
||||
}),
|
||||
},
|
||||
},
|
||||
},
|
||||
"207": {
|
||||
description: "Contacts uploaded partially successfully.",
|
||||
content: {
|
||||
"application/json": {
|
||||
schema: z.object({
|
||||
data: z.object({
|
||||
status: z.string(),
|
||||
message: z.string(),
|
||||
skippedContacts: z.array(
|
||||
z.object({
|
||||
index: z.number(),
|
||||
userId: z.string(),
|
||||
})
|
||||
),
|
||||
}),
|
||||
}),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
export const bulkContactPaths: ZodOpenApiPathsObject = {
|
||||
"/contacts/bulk": {
|
||||
put: bulkContactEndpoint,
|
||||
},
|
||||
};
|
||||
@@ -0,0 +1,476 @@
|
||||
import { contactCache } from "@/lib/cache/contact";
|
||||
import { contactAttributeCache } from "@/lib/cache/contact-attribute";
|
||||
import { contactAttributeKeyCache } from "@/lib/cache/contact-attribute-key";
|
||||
import { upsertBulkContacts } from "@/modules/ee/contacts/api/v2/management/contacts/bulk/lib/contact";
|
||||
import { beforeEach, describe, expect, test, vi } from "vitest";
|
||||
import { prisma } from "@formbricks/database";
|
||||
|
||||
// Ensure that createId always returns "mock-id" for predictability
|
||||
vi.mock("@paralleldrive/cuid2", () => ({
|
||||
createId: vi.fn(() => "mock-id"),
|
||||
}));
|
||||
|
||||
// Mock prisma methods
|
||||
vi.mock("@formbricks/database", () => ({
|
||||
prisma: {
|
||||
contactAttribute: {
|
||||
findMany: vi.fn(),
|
||||
},
|
||||
contactAttributeKey: {
|
||||
findMany: vi.fn(),
|
||||
createManyAndReturn: vi.fn(),
|
||||
},
|
||||
contact: {
|
||||
findMany: vi.fn(),
|
||||
createMany: vi.fn(),
|
||||
},
|
||||
$transaction: vi.fn((callback) => callback(prisma)),
|
||||
$executeRaw: vi.fn(),
|
||||
$queryRaw: vi.fn(),
|
||||
},
|
||||
}));
|
||||
|
||||
// Mock cache functions
|
||||
vi.mock("@/lib/cache/contact", () => ({
|
||||
contactCache: {
|
||||
revalidate: vi.fn(),
|
||||
tag: {
|
||||
byId: (id: string) => `contacts-${id}`,
|
||||
byEnvironmentId: (environmentId: string) => `environments-${environmentId}-contacts`,
|
||||
},
|
||||
},
|
||||
}));
|
||||
|
||||
vi.mock("@/lib/cache/contact-attribute", () => ({
|
||||
contactAttributeCache: {
|
||||
revalidate: vi.fn(),
|
||||
tag: {
|
||||
byEnvironmentId: (environmentId: string) => `contactAttributes-${environmentId}`,
|
||||
},
|
||||
},
|
||||
}));
|
||||
|
||||
vi.mock("@/lib/cache/contact-attribute-key", () => ({
|
||||
contactAttributeKeyCache: {
|
||||
revalidate: vi.fn(),
|
||||
tag: {
|
||||
byEnvironmentId: (environmentId: string) => `environments-${environmentId}-contactAttributeKeys`,
|
||||
},
|
||||
},
|
||||
}));
|
||||
|
||||
describe("upsertBulkContacts", () => {
|
||||
const mockEnvironmentId = "env_123";
|
||||
|
||||
beforeEach(() => {
|
||||
vi.clearAllMocks();
|
||||
});
|
||||
|
||||
test("should create new contacts when all provided contacts have unique user IDs and emails", async () => {
|
||||
// Mock data: two contacts with unique userId and email
|
||||
const mockContacts = [
|
||||
{
|
||||
attributes: [
|
||||
{ attributeKey: { key: "email", name: "Email" }, value: "john@example.com" },
|
||||
{ attributeKey: { key: "userId", name: "User ID" }, value: "user-123" },
|
||||
{ attributeKey: { key: "name", name: "Name" }, value: "John Doe" },
|
||||
],
|
||||
},
|
||||
{
|
||||
attributes: [
|
||||
{ attributeKey: { key: "email", name: "Email" }, value: "jane@example.com" },
|
||||
{ attributeKey: { key: "userId", name: "User ID" }, value: "user-456" },
|
||||
{ attributeKey: { key: "name", name: "Name" }, value: "Jane Smith" },
|
||||
],
|
||||
},
|
||||
];
|
||||
|
||||
const mockParsedEmails = ["john@example.com", "jane@example.com"];
|
||||
|
||||
// Mock: no existing userIds in DB
|
||||
vi.mocked(prisma.contactAttribute.findMany).mockResolvedValueOnce([]);
|
||||
// Mock: all attribute keys already exist
|
||||
const mockAttributeKeys = [
|
||||
{ id: "attr-key-email", key: "email", environmentId: mockEnvironmentId, name: "Email" },
|
||||
{ id: "attr-key-userId", key: "userId", environmentId: mockEnvironmentId, name: "User ID" },
|
||||
{ id: "attr-key-name", key: "name", environmentId: mockEnvironmentId, name: "Name" },
|
||||
];
|
||||
vi.mocked(prisma.contactAttributeKey.findMany).mockResolvedValueOnce(mockAttributeKeys);
|
||||
// Mock: no existing contacts by email
|
||||
vi.mocked(prisma.contact.findMany).mockResolvedValueOnce([]);
|
||||
|
||||
// Execute the function
|
||||
const result = await upsertBulkContacts(mockContacts, mockEnvironmentId, mockParsedEmails);
|
||||
|
||||
// Assert that the result is ok and data is as expected
|
||||
if (!result.ok) throw new Error("Expected result.ok to be true");
|
||||
expect(result.data).toEqual({ contactIdxWithConflictingUserIds: [] });
|
||||
|
||||
// Verify that existing user IDs were checked
|
||||
expect(prisma.contactAttribute.findMany).toHaveBeenCalledWith({
|
||||
where: {
|
||||
attributeKey: {
|
||||
environmentId: mockEnvironmentId,
|
||||
key: "userId",
|
||||
},
|
||||
value: {
|
||||
in: ["user-123", "user-456"],
|
||||
},
|
||||
},
|
||||
select: { value: true },
|
||||
});
|
||||
|
||||
// Verify that attribute keys were fetched
|
||||
expect(prisma.contactAttributeKey.findMany).toHaveBeenCalledWith({
|
||||
where: {
|
||||
key: { in: ["email", "userId", "name"] },
|
||||
environmentId: mockEnvironmentId,
|
||||
},
|
||||
});
|
||||
|
||||
// Verify that existing contacts were looked up by email
|
||||
expect(prisma.contact.findMany).toHaveBeenCalledWith({
|
||||
where: {
|
||||
environmentId: mockEnvironmentId,
|
||||
attributes: {
|
||||
some: {
|
||||
attributeKey: { key: "email" },
|
||||
value: { in: mockParsedEmails },
|
||||
},
|
||||
},
|
||||
},
|
||||
select: {
|
||||
attributes: {
|
||||
select: {
|
||||
attributeKey: { select: { key: true } },
|
||||
createdAt: true,
|
||||
id: true,
|
||||
value: true,
|
||||
},
|
||||
},
|
||||
id: true,
|
||||
},
|
||||
});
|
||||
|
||||
// Verify that new contacts were created in the transaction
|
||||
expect(prisma.contact.createMany).toHaveBeenCalledWith({
|
||||
data: [
|
||||
{ id: "mock-id", environmentId: mockEnvironmentId },
|
||||
{ id: "mock-id", environmentId: mockEnvironmentId },
|
||||
],
|
||||
});
|
||||
|
||||
// Verify that the raw SQL query was executed to upsert attributes
|
||||
expect(prisma.$executeRaw).toHaveBeenCalled();
|
||||
|
||||
// Verify that caches were revalidated
|
||||
expect(contactCache.revalidate).toHaveBeenCalledWith({ environmentId: mockEnvironmentId });
|
||||
// Since two new contacts are created with same id "mock-id", expect at least one revalidation with id "mock-id"
|
||||
expect(contactCache.revalidate).toHaveBeenCalledWith({ id: "mock-id" });
|
||||
expect(contactAttributeKeyCache.revalidate).toHaveBeenCalledWith({ environmentId: mockEnvironmentId });
|
||||
expect(contactAttributeCache.revalidate).toHaveBeenCalledWith({ environmentId: mockEnvironmentId });
|
||||
});
|
||||
|
||||
test("should update existing contacts when provided contacts match an existing email", async () => {
|
||||
// Mock data: a contact that exists in the DB
|
||||
const mockContacts = [
|
||||
{
|
||||
attributes: [
|
||||
{ attributeKey: { key: "email", name: "Email" }, value: "john@example.com" },
|
||||
// No userId is provided so it should be treated as update
|
||||
],
|
||||
},
|
||||
];
|
||||
|
||||
const mockParsedEmails = ["john@example.com"];
|
||||
|
||||
// Mock: no existing userIds conflict
|
||||
vi.mocked(prisma.contactAttribute.findMany).mockResolvedValueOnce([]);
|
||||
// Mock: attribute keys for email exist
|
||||
const mockAttributeKeys = [
|
||||
{ id: "attr-key-email", key: "email", environmentId: mockEnvironmentId, name: "Email" },
|
||||
];
|
||||
vi.mocked(prisma.contactAttributeKey.findMany).mockResolvedValueOnce(mockAttributeKeys);
|
||||
// Mock: an existing contact with the same email
|
||||
vi.mocked(prisma.contact.findMany).mockResolvedValueOnce([
|
||||
{
|
||||
id: "existing-contact-id",
|
||||
attributes: [
|
||||
{
|
||||
id: "existing-email-attr",
|
||||
attributeKey: { key: "email", name: "Email" },
|
||||
value: "john@example.com",
|
||||
createdAt: new Date("2023-01-01"),
|
||||
},
|
||||
],
|
||||
},
|
||||
]);
|
||||
|
||||
// Execute the function
|
||||
const result = await upsertBulkContacts(mockContacts, mockEnvironmentId, mockParsedEmails);
|
||||
|
||||
if (!result.ok) throw new Error("Expected result.ok to be true");
|
||||
expect(result.data).toEqual({ contactIdxWithConflictingUserIds: [] });
|
||||
});
|
||||
|
||||
test("should return the indices of contacts with conflicting user IDs", async () => {
|
||||
// Mock data - mix of valid and conflicting contacts
|
||||
const mockContacts = [
|
||||
{
|
||||
// Contact 0: Valid contact with unique userId
|
||||
attributes: [
|
||||
{ attributeKey: { key: "email", name: "Email" }, value: "john@example.com" },
|
||||
{ attributeKey: { key: "userId", name: "User ID" }, value: "user-123" },
|
||||
{ attributeKey: { key: "name", name: "Name" }, value: "John Doe" },
|
||||
],
|
||||
},
|
||||
{
|
||||
// Contact 1: Conflicting contact (userId already exists)
|
||||
attributes: [
|
||||
{ attributeKey: { key: "email", name: "Email" }, value: "jane@example.com" },
|
||||
{ attributeKey: { key: "userId", name: "User ID" }, value: "existing-user-1" },
|
||||
{ attributeKey: { key: "name", name: "Name" }, value: "Jane Smith" },
|
||||
],
|
||||
},
|
||||
{
|
||||
// Contact 2: Valid contact with no userId
|
||||
attributes: [
|
||||
{ attributeKey: { key: "email", name: "Email" }, value: "bob@example.com" },
|
||||
{ attributeKey: { key: "name", name: "Name" }, value: "Bob Johnson" },
|
||||
],
|
||||
},
|
||||
{
|
||||
// Contact 3: Conflicting contact (userId already exists)
|
||||
attributes: [
|
||||
{ attributeKey: { key: "email", name: "Email" }, value: "alice@example.com" },
|
||||
{ attributeKey: { key: "userId", name: "User ID" }, value: "existing-user-2" },
|
||||
{ attributeKey: { key: "name", name: "Name" }, value: "Alice Brown" },
|
||||
],
|
||||
},
|
||||
];
|
||||
|
||||
const mockParsedEmails = ["john@example.com", "jane@example.com", "bob@example.com", "alice@example.com"];
|
||||
|
||||
// Mock existing user IDs - these will conflict with some of our contacts
|
||||
const mockExistingUserIds = [{ value: "existing-user-1" }, { value: "existing-user-2" }];
|
||||
vi.mocked(prisma.contactAttribute.findMany).mockResolvedValueOnce(mockExistingUserIds);
|
||||
|
||||
// Mock attribute keys
|
||||
const mockAttributeKeys = [
|
||||
{ id: "attr-key-email", key: "email", environmentId: mockEnvironmentId },
|
||||
{ id: "attr-key-userId", key: "userId", environmentId: mockEnvironmentId },
|
||||
{ id: "attr-key-name", key: "name", environmentId: mockEnvironmentId },
|
||||
];
|
||||
vi.mocked(prisma.contactAttributeKey.findMany).mockResolvedValueOnce(mockAttributeKeys);
|
||||
|
||||
// Mock existing contacts (none for this test case)
|
||||
vi.mocked(prisma.contact.findMany).mockResolvedValueOnce([]);
|
||||
|
||||
// Execute the function
|
||||
const result = await upsertBulkContacts(mockContacts, mockEnvironmentId, mockParsedEmails);
|
||||
|
||||
if (result.ok) {
|
||||
// Assertions - verify that the function correctly identified contacts with conflicting user IDs
|
||||
expect(result.data.contactIdxWithConflictingUserIds).toEqual([1, 3]);
|
||||
|
||||
// Verify that the function checked for existing user IDs
|
||||
expect(prisma.contactAttribute.findMany).toHaveBeenCalledWith({
|
||||
where: {
|
||||
attributeKey: {
|
||||
environmentId: mockEnvironmentId,
|
||||
key: "userId",
|
||||
},
|
||||
value: {
|
||||
in: ["user-123", "existing-user-1", "existing-user-2"],
|
||||
},
|
||||
},
|
||||
select: {
|
||||
value: true,
|
||||
},
|
||||
});
|
||||
|
||||
// Verify that the function fetched attribute keys for the filtered contacts (without conflicting userIds)
|
||||
expect(prisma.contactAttributeKey.findMany).toHaveBeenCalled();
|
||||
|
||||
// Verify that the function checked for existing contacts by email
|
||||
expect(prisma.contact.findMany).toHaveBeenCalledWith({
|
||||
where: {
|
||||
environmentId: mockEnvironmentId,
|
||||
attributes: {
|
||||
some: {
|
||||
attributeKey: { key: "email" },
|
||||
value: { in: mockParsedEmails },
|
||||
},
|
||||
},
|
||||
},
|
||||
select: {
|
||||
attributes: {
|
||||
select: {
|
||||
attributeKey: { select: { key: true } },
|
||||
createdAt: true,
|
||||
id: true,
|
||||
value: true,
|
||||
},
|
||||
},
|
||||
id: true,
|
||||
},
|
||||
});
|
||||
|
||||
// Verify that only non-conflicting contacts were processed
|
||||
expect(prisma.contact.createMany).toHaveBeenCalledWith({
|
||||
data: [
|
||||
{ id: "mock-id", environmentId: mockEnvironmentId },
|
||||
{ id: "mock-id", environmentId: mockEnvironmentId },
|
||||
],
|
||||
});
|
||||
|
||||
// Verify that the transaction was executed
|
||||
expect(prisma.$transaction).toHaveBeenCalled();
|
||||
|
||||
// Verify that caches were revalidated
|
||||
expect(contactCache.revalidate).toHaveBeenCalledWith({
|
||||
environmentId: mockEnvironmentId,
|
||||
});
|
||||
expect(contactAttributeKeyCache.revalidate).toHaveBeenCalledWith({
|
||||
environmentId: mockEnvironmentId,
|
||||
});
|
||||
expect(contactAttributeCache.revalidate).toHaveBeenCalledWith({
|
||||
environmentId: mockEnvironmentId,
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
test("should create missing attribute keys when they are not found in the database", async () => {
|
||||
// Mock data: contacts with attributes that include missing attribute keys
|
||||
const mockContacts = [
|
||||
{
|
||||
attributes: [
|
||||
{ attributeKey: { key: "email", name: "Email" }, value: "john@example.com" },
|
||||
{ attributeKey: { key: "newKey1", name: "New Key 1" }, value: "value1" },
|
||||
],
|
||||
},
|
||||
{
|
||||
attributes: [
|
||||
{ attributeKey: { key: "email", name: "Email" }, value: "jane@example.com" },
|
||||
{ attributeKey: { key: "newKey2", name: "New Key 2" }, value: "value2" },
|
||||
],
|
||||
},
|
||||
];
|
||||
const mockParsedEmails = ["john@example.com", "jane@example.com"];
|
||||
|
||||
// Mock: no existing user IDs
|
||||
vi.mocked(prisma.contactAttribute.findMany).mockResolvedValueOnce([]);
|
||||
// Mock: only "email" exists; new keys are missing
|
||||
const mockAttributeKeys = [
|
||||
{ id: "attr-key-email", key: "email", environmentId: mockEnvironmentId, name: "Email" },
|
||||
{ id: "attr-key-newKey1", key: "newKey1", environmentId: mockEnvironmentId, name: "New Key 1" },
|
||||
{ id: "attr-key-newKey2", key: "newKey2", environmentId: mockEnvironmentId, name: "New Key 2" },
|
||||
];
|
||||
|
||||
vi.mocked(prisma.contactAttributeKey.findMany).mockResolvedValueOnce(mockAttributeKeys);
|
||||
|
||||
// Mock: no existing contacts for update
|
||||
vi.mocked(prisma.contact.findMany).mockResolvedValueOnce([]);
|
||||
|
||||
// Execute the function
|
||||
const result = await upsertBulkContacts(mockContacts, mockEnvironmentId, mockParsedEmails);
|
||||
|
||||
// creation of new attribute keys now happens with a raw query
|
||||
// so we need to mock that
|
||||
vi.mocked(prisma.$queryRaw).mockResolvedValue([
|
||||
{ id: "attr-key-newKey1", key: "newKey1" },
|
||||
{ id: "attr-key-newKey2", key: "newKey2" },
|
||||
]);
|
||||
|
||||
if (!result.ok) throw new Error("Expected result.ok to be true");
|
||||
expect(result.data).toEqual({ contactIdxWithConflictingUserIds: [] });
|
||||
|
||||
// Verify that new contacts were created
|
||||
expect(prisma.contact.createMany).toHaveBeenCalledWith({
|
||||
data: [
|
||||
{ id: "mock-id", environmentId: mockEnvironmentId },
|
||||
{ id: "mock-id", environmentId: mockEnvironmentId },
|
||||
],
|
||||
});
|
||||
|
||||
// Verify that the raw SQL query was executed for inserting attributes
|
||||
expect(prisma.$executeRaw).toHaveBeenCalled();
|
||||
|
||||
// Verify that caches were revalidated
|
||||
expect(contactAttributeKeyCache.revalidate).toHaveBeenCalledWith({
|
||||
environmentId: mockEnvironmentId,
|
||||
});
|
||||
});
|
||||
|
||||
test("should update attribute key names when they change", async () => {
|
||||
// Mock data: a contact with an attribute that has a new name for an existing key
|
||||
const mockContacts = [
|
||||
{
|
||||
attributes: [
|
||||
{ attributeKey: { key: "email", name: "Email" }, value: "john@example.com" },
|
||||
{ attributeKey: { key: "name", name: "Full Name" }, value: "John Doe" }, // Changed name from "Name" to "Full Name"
|
||||
],
|
||||
},
|
||||
];
|
||||
|
||||
const mockParsedEmails = ["john@example.com"];
|
||||
|
||||
// Mock: no existing userIds conflict
|
||||
vi.mocked(prisma.contactAttribute.findMany).mockResolvedValueOnce([]);
|
||||
|
||||
// Mock: attribute keys exist but with different names
|
||||
const mockAttributeKeys = [
|
||||
{ id: "attr-key-email", key: "email", environmentId: mockEnvironmentId, name: "Email" },
|
||||
{ id: "attr-key-name", key: "name", environmentId: mockEnvironmentId, name: "Name" }, // Original name
|
||||
];
|
||||
vi.mocked(prisma.contactAttributeKey.findMany).mockResolvedValueOnce(mockAttributeKeys);
|
||||
|
||||
// Mock: an existing contact
|
||||
vi.mocked(prisma.contact.findMany).mockResolvedValueOnce([
|
||||
{
|
||||
id: "existing-contact-id",
|
||||
attributes: [
|
||||
{
|
||||
id: "existing-email-attr",
|
||||
attributeKey: { key: "email", name: "Email" },
|
||||
value: "john@example.com",
|
||||
createdAt: new Date("2023-01-01"),
|
||||
},
|
||||
{
|
||||
id: "existing-name-attr",
|
||||
attributeKey: { key: "name", name: "Name" },
|
||||
value: "John Doe",
|
||||
createdAt: new Date("2023-01-01"),
|
||||
},
|
||||
],
|
||||
},
|
||||
]);
|
||||
|
||||
// Mock the transaction
|
||||
const mockTransaction = {
|
||||
contact: {
|
||||
createMany: vi.fn().mockResolvedValue({ count: 0 }),
|
||||
},
|
||||
$executeRaw: vi.fn().mockResolvedValue({ count: 0 }),
|
||||
$queryRaw: vi.fn().mockResolvedValue([{ id: "attr-key-name", key: "name", name: "Full Name" }]),
|
||||
};
|
||||
|
||||
vi.mocked(prisma.$transaction).mockImplementationOnce((callback) => {
|
||||
return callback(mockTransaction as any);
|
||||
});
|
||||
|
||||
// Execute the function
|
||||
const result = await upsertBulkContacts(mockContacts, mockEnvironmentId, mockParsedEmails);
|
||||
|
||||
if (!result.ok) throw new Error("Expected result.ok to be true");
|
||||
expect(result.data).toEqual({ contactIdxWithConflictingUserIds: [] });
|
||||
|
||||
// Verify that the raw SQL query was executed for updating attribute keys
|
||||
vi.mocked(prisma.$queryRaw).mockResolvedValue([{ id: "attr-key-name", key: "name", name: "Full Name" }]);
|
||||
|
||||
// Verify that caches were revalidated
|
||||
expect(contactAttributeKeyCache.revalidate).toHaveBeenCalledWith({
|
||||
environmentId: mockEnvironmentId,
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -0,0 +1,61 @@
|
||||
import { responses } from "@/modules/api/v2/lib/response";
|
||||
import { handleApiError } from "@/modules/api/v2/lib/utils";
|
||||
import { authenticatedApiClient } from "@/modules/api/v2/management/auth/authenticated-api-client";
|
||||
import { upsertBulkContacts } from "@/modules/ee/contacts/api/v2/management/contacts/bulk/lib/contact";
|
||||
import { ZContactBulkUploadRequest } from "@/modules/ee/contacts/types/contact";
|
||||
import { getIsContactsEnabled } from "@/modules/ee/license-check/lib/utils";
|
||||
|
||||
export const PUT = async (request: Request) =>
|
||||
authenticatedApiClient({
|
||||
request,
|
||||
schemas: {
|
||||
body: ZContactBulkUploadRequest,
|
||||
},
|
||||
handler: async ({ authentication, parsedInput }) => {
|
||||
const isContactsEnabled = await getIsContactsEnabled();
|
||||
if (!isContactsEnabled) {
|
||||
return handleApiError(request, {
|
||||
type: "forbidden",
|
||||
details: [{ field: "error", issue: "Contacts are not enabled for this environment." }],
|
||||
});
|
||||
}
|
||||
|
||||
const { contacts } = parsedInput.body ?? { contacts: [] };
|
||||
const { environmentId } = authentication;
|
||||
|
||||
const emails = contacts.map(
|
||||
(contact) => contact.attributes.find((attr) => attr.attributeKey.key === "email")?.value!
|
||||
);
|
||||
|
||||
const upsertBulkContactsResult = await upsertBulkContacts(contacts, environmentId, emails);
|
||||
|
||||
if (!upsertBulkContactsResult.ok) {
|
||||
return handleApiError(request, upsertBulkContactsResult.error);
|
||||
}
|
||||
|
||||
const { contactIdxWithConflictingUserIds } = upsertBulkContactsResult.data;
|
||||
|
||||
if (contactIdxWithConflictingUserIds.length) {
|
||||
return responses.multiStatusResponse({
|
||||
data: {
|
||||
status: "success",
|
||||
message:
|
||||
"Contacts bulk upload partially successful. Some contacts were skipped due to conflicting userIds.",
|
||||
meta: {
|
||||
skippedContacts: contactIdxWithConflictingUserIds.map((idx) => ({
|
||||
index: idx,
|
||||
userId: contacts[idx].attributes.find((attr) => attr.attributeKey.key === "userId")?.value,
|
||||
})),
|
||||
},
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
return responses.successResponse({
|
||||
data: {
|
||||
status: "success",
|
||||
message: "Contacts bulk upload successful",
|
||||
},
|
||||
});
|
||||
},
|
||||
});
|
||||
@@ -107,3 +107,138 @@ export const ZContactCSVAttributeMap = z.record(z.string(), z.string()).superRef
|
||||
}
|
||||
});
|
||||
export type TContactCSVAttributeMap = z.infer<typeof ZContactCSVAttributeMap>;
|
||||
|
||||
export const ZContactBulkUploadAttribute = z.object({
|
||||
attributeKey: z.object({
|
||||
key: z.string(),
|
||||
name: z.string(),
|
||||
}),
|
||||
value: z.string(),
|
||||
});
|
||||
|
||||
export const ZContactBulkUploadContact = z.object({
|
||||
attributes: z.array(ZContactBulkUploadAttribute),
|
||||
});
|
||||
|
||||
export type TContactBulkUploadContact = z.infer<typeof ZContactBulkUploadContact>;
|
||||
|
||||
export const ZContactBulkUploadRequest = z.object({
|
||||
contacts: z
|
||||
.array(ZContactBulkUploadContact)
|
||||
.max(1000, { message: "Maximum 1000 contacts allowed at a time." })
|
||||
.superRefine((contacts, ctx) => {
|
||||
// Track all data in a single pass
|
||||
const seenEmails = new Set<string>();
|
||||
const duplicateEmails = new Set<string>();
|
||||
const seenUserIds = new Set<string>();
|
||||
const duplicateUserIds = new Set<string>();
|
||||
const contactsWithDuplicateKeys: { idx: number; duplicateKeys: string[] }[] = [];
|
||||
|
||||
// Process each contact in a single pass
|
||||
contacts.forEach((contact, idx) => {
|
||||
// 1. Check email existence and validity
|
||||
const emailAttr = contact.attributes.find((attr) => attr.attributeKey.key === "email");
|
||||
if (!emailAttr?.value) {
|
||||
ctx.addIssue({
|
||||
code: z.ZodIssueCode.custom,
|
||||
message: `Missing email attribute for contact at index ${idx}`,
|
||||
});
|
||||
} else {
|
||||
// Check email format
|
||||
const parsedEmail = z.string().email().safeParse(emailAttr.value);
|
||||
if (!parsedEmail.success) {
|
||||
ctx.addIssue({
|
||||
code: z.ZodIssueCode.custom,
|
||||
message: `Invalid email for contact at index ${idx}`,
|
||||
});
|
||||
}
|
||||
|
||||
// Check for duplicate emails
|
||||
if (seenEmails.has(emailAttr.value)) {
|
||||
duplicateEmails.add(emailAttr.value);
|
||||
} else {
|
||||
seenEmails.add(emailAttr.value);
|
||||
}
|
||||
}
|
||||
|
||||
// 2. Check for userId duplicates
|
||||
const userIdAttr = contact.attributes.find((attr) => attr.attributeKey.key === "userId");
|
||||
if (userIdAttr?.value) {
|
||||
if (seenUserIds.has(userIdAttr.value)) {
|
||||
duplicateUserIds.add(userIdAttr.value);
|
||||
} else {
|
||||
seenUserIds.add(userIdAttr.value);
|
||||
}
|
||||
}
|
||||
|
||||
// 3. Check for duplicate attribute keys within the same contact
|
||||
const keyOccurrences = new Map<string, number>();
|
||||
const duplicateKeysForContact: string[] = [];
|
||||
|
||||
contact.attributes.forEach((attr) => {
|
||||
const key = attr.attributeKey.key;
|
||||
const count = (keyOccurrences.get(key) || 0) + 1;
|
||||
keyOccurrences.set(key, count);
|
||||
|
||||
// If this is the second occurrence, add to duplicates
|
||||
if (count === 2) {
|
||||
duplicateKeysForContact.push(key);
|
||||
}
|
||||
});
|
||||
|
||||
if (duplicateKeysForContact.length > 0) {
|
||||
contactsWithDuplicateKeys.push({ idx, duplicateKeys: duplicateKeysForContact });
|
||||
}
|
||||
});
|
||||
|
||||
// Report all validation issues after the single pass
|
||||
if (duplicateEmails.size > 0) {
|
||||
ctx.addIssue({
|
||||
code: z.ZodIssueCode.custom,
|
||||
message: "Duplicate emails found in the records, please ensure each email is unique.",
|
||||
params: {
|
||||
duplicateEmails: Array.from(duplicateEmails),
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
if (duplicateUserIds.size > 0) {
|
||||
ctx.addIssue({
|
||||
code: z.ZodIssueCode.custom,
|
||||
message: "Duplicate userIds found in the records, please ensure each userId is unique.",
|
||||
params: {
|
||||
duplicateUserIds: Array.from(duplicateUserIds),
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
if (contactsWithDuplicateKeys.length > 0) {
|
||||
ctx.addIssue({
|
||||
code: z.ZodIssueCode.custom,
|
||||
message:
|
||||
"Duplicate attribute keys found in the records, please ensure each attribute key is unique.",
|
||||
params: {
|
||||
contactsWithDuplicateKeys,
|
||||
},
|
||||
});
|
||||
}
|
||||
}),
|
||||
});
|
||||
|
||||
export type TContactBulkUploadRequest = z.infer<typeof ZContactBulkUploadRequest>;
|
||||
|
||||
export type TContactBulkUploadResponseBase = {
|
||||
status: "success" | "error";
|
||||
message: string;
|
||||
};
|
||||
|
||||
export type TContactBulkUploadResponseError = TContactBulkUploadResponseBase & {
|
||||
status: "error";
|
||||
message: string;
|
||||
errors: Record<string, string>[];
|
||||
};
|
||||
|
||||
export type TContactBulkUploadResponseSuccess = TContactBulkUploadResponseBase & {
|
||||
processed: number;
|
||||
failed: number;
|
||||
};
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
// vitest.config.ts
|
||||
import react from "@vitejs/plugin-react";
|
||||
import { loadEnv } from "vite";
|
||||
import { PluginOption, loadEnv } from "vite";
|
||||
import tsconfigPaths from "vite-tsconfig-paths";
|
||||
import { defineConfig } from "vitest/config";
|
||||
|
||||
@@ -47,6 +47,7 @@ export default defineConfig({
|
||||
"modules/survey/lib/client-utils.ts",
|
||||
"modules/survey/list/components/survey-card.tsx",
|
||||
"modules/survey/list/components/survey-dropdown-menu.tsx",
|
||||
"modules/ee/contacts/api/v2/management/contacts/bulk/lib/contact.ts",
|
||||
],
|
||||
exclude: [
|
||||
"**/.next/**",
|
||||
@@ -59,5 +60,5 @@ export default defineConfig({
|
||||
],
|
||||
},
|
||||
},
|
||||
plugins: [tsconfigPaths(), react()],
|
||||
plugins: [tsconfigPaths(), react() as PluginOption],
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user