moves bulk contacts upsert logic to a service

This commit is contained in:
pandeymangg
2025-03-21 13:24:15 +05:30
parent b5212e0e0e
commit ed870ea0ce
9 changed files with 275 additions and 303 deletions

View File

@@ -0,0 +1,248 @@
import { contactCache } from "@/lib/cache/contact";
import { contactAttributeCache } from "@/lib/cache/contact-attribute";
import { contactAttributeKeyCache } from "@/lib/cache/contact-attribute-key";
import { TContactBulkUploadContact } from "@/modules/ee/contacts/types/contact";
import { createId } from "@paralleldrive/cuid2";
import { Prisma } from "@prisma/client";
import { prisma } from "@formbricks/database";
export const upsertBulkContacts = async (
contacts: TContactBulkUploadContact[],
environmentId: string,
parsedEmails: string[]
) => {
const emailKey = "email";
// Get unique attribute keys from the payload
const keys = Array.from(
new Set(contacts.flatMap((contact) => contact.attributes.map((attr) => attr.attributeKey.key)))
);
// 2. Fetch attribute key records for these keys in this environment
const attributeKeys = await prisma.contactAttributeKey.findMany({
where: {
key: { in: keys },
environmentId,
},
});
const attributeKeyMap = attributeKeys.reduce<Record<string, string>>((acc, keyObj) => {
acc[keyObj.key] = keyObj.id;
return acc;
}, {});
// 2a. Check for missing attribute keys and create them if needed.
const missingKeysMap = new Map<string, { key: string; name: string }>();
for (const contact of contacts) {
for (const attr of contact.attributes) {
if (!attributeKeyMap[attr.attributeKey.key]) {
missingKeysMap.set(attr.attributeKey.key, attr.attributeKey);
}
}
}
// 3. Find existing contacts by matching email attribute
const existingContacts = await prisma.contact.findMany({
where: {
environmentId,
attributes: {
some: {
attributeKey: { key: emailKey },
value: { in: parsedEmails },
},
},
},
select: {
attributes: {
select: {
attributeKey: { select: { key: true } },
createdAt: true,
id: true,
value: true,
},
},
id: true,
},
});
// Build a map from email to contact id (if the email attribute exists)
const contactMap = new Map<
string,
{ contactId: string; attributes: { id: string; attributeKey: { key: string }; createdAt: Date }[] }
>();
existingContacts.forEach((contact) => {
const emailAttr = contact.attributes.find((attr) => attr.attributeKey.key === emailKey);
if (emailAttr) {
contactMap.set(emailAttr.value, {
contactId: contact.id,
attributes: contact.attributes.map((attr) => ({
id: attr.id,
attributeKey: { key: attr.attributeKey.key },
createdAt: attr.createdAt,
})),
});
}
});
// 4. Split contacts into ones to update and ones to create
const contactsToUpdate: {
contactId: string;
attributes: {
id: string;
createdAt: Date;
value: string;
attributeKey: {
key: string;
};
}[];
}[] = [];
const contactsToCreate: {
attributes: {
value: string;
attributeKey: {
key: string;
};
}[];
}[] = [];
for (const contact of contacts) {
const emailAttr = contact.attributes.find((attr) => attr.attributeKey.key === emailKey);
if (emailAttr && contactMap.has(emailAttr.value)) {
contactsToUpdate.push({
contactId: contactMap.get(emailAttr.value)!.contactId,
attributes: contact.attributes.map((attr) => {
const existingAttr = contactMap
.get(emailAttr.value)!
.attributes.find((a) => a.attributeKey.key === attr.attributeKey.key);
if (!existingAttr) {
// Should never happen, just to be safe and satisfy typescript
return {
id: createId(),
createdAt: new Date(),
value: attr.value,
attributeKey: attr.attributeKey,
};
}
return {
id: existingAttr.id,
createdAt: existingAttr.createdAt,
value: attr.value,
attributeKey: attr.attributeKey,
};
}),
});
} else {
contactsToCreate.push(contact);
}
}
// 5. Execute everything in ONE transaction
await prisma.$transaction(async (tx) => {
// Create missing attribute keys if needed (moved inside transaction)
if (missingKeysMap.size > 0) {
const missingKeysArray = Array.from(missingKeysMap.values());
// Create missing attribute keys in a batch
const newAttributeKeys = await tx.contactAttributeKey.createManyAndReturn({
data: missingKeysArray.map((keyObj) => ({
key: keyObj.key,
name: keyObj.name,
environmentId,
})),
select: { key: true, id: true },
skipDuplicates: true,
});
// Refresh the attribute key map for the missing keys
for (const attrKey of newAttributeKeys) {
attributeKeyMap[attrKey.key] = attrKey.id;
}
}
// Create new contacts
const newContacts = contactsToCreate.map(() => ({
id: createId(),
environmentId,
}));
if (newContacts.length > 0) {
await tx.contact.createMany({
data: newContacts,
});
}
// Prepare attributes for both new and existing contacts
const attributesUpsertForCreatedUsers = contactsToCreate.flatMap((contact, idx) =>
contact.attributes.map((attr) => ({
id: createId(),
contactId: newContacts[idx].id,
attributeKeyId: attributeKeyMap[attr.attributeKey.key],
value: attr.value,
createdAt: new Date(),
updatedAt: new Date(),
}))
);
const attributesUpsertForExistingUsers = contactsToUpdate.flatMap((contact) =>
contact.attributes.map((attr) => ({
id: attr.id,
contactId: contact.contactId,
attributeKeyId: attributeKeyMap[attr.attributeKey.key],
value: attr.value,
createdAt: attr.createdAt,
updatedAt: new Date(),
}))
);
const attributesToUpsert = [...attributesUpsertForCreatedUsers, ...attributesUpsertForExistingUsers];
// Skip the raw query if there are no attributes to upsert
if (attributesToUpsert.length > 0) {
// Use a raw query to perform a bulk insert with an ON CONFLICT clause
await tx.$executeRaw`
INSERT INTO "ContactAttribute" (
"id", "created_at", "updated_at", "contactId", "value", "attributeKeyId"
)
SELECT
unnest(${Prisma.sql`ARRAY[${attributesToUpsert.map((a) => a.id)}]`}),
unnest(${Prisma.sql`ARRAY[${attributesToUpsert.map((a) => a.createdAt)}]`}),
unnest(${Prisma.sql`ARRAY[${attributesToUpsert.map((a) => a.updatedAt)}]`}),
unnest(${Prisma.sql`ARRAY[${attributesToUpsert.map((a) => a.contactId)}]`}),
unnest(${Prisma.sql`ARRAY[${attributesToUpsert.map((a) => a.value)}]`}),
unnest(${Prisma.sql`ARRAY[${attributesToUpsert.map((a) => a.attributeKeyId)}]`})
ON CONFLICT ("contactId", "attributeKeyId") DO UPDATE SET
"value" = EXCLUDED."value",
"updated_at" = EXCLUDED."updated_at"
`;
}
contactCache.revalidate({
environmentId,
});
// revalidate all the new contacts:
for (const newContact of newContacts) {
contactCache.revalidate({
id: newContact.id,
});
}
// revalidate all the existing contacts:
for (const existingContact of existingContacts) {
contactCache.revalidate({
id: existingContact.id,
});
}
contactAttributeKeyCache.revalidate({
environmentId,
});
contactAttributeCache.revalidate({ environmentId });
});
};

View File

@@ -1,6 +1,7 @@
import { transformErrorToDetails } from "@/app/lib/api/validator";
import { responses } from "@/modules/api/v2/lib/response";
import { authenticateRequest } from "@/modules/api/v2/management/auth/authenticate-request";
import { upsertBulkContacts } from "@/modules/ee/contacts/api/bulk/lib/contact";
import { ZContactBulkUploadRequest } from "@/modules/ee/contacts/types/contact";
import { getIsContactsEnabled } from "@/modules/ee/license-check/lib/utils";
import { createId } from "@paralleldrive/cuid2";
@@ -122,290 +123,7 @@ export const PUT = async (request: NextRequest) => {
});
}
// Get unique attribute keys from the payload
const keys = Array.from(
new Set(filteredContacts.flatMap((contact) => contact.attributes.map((attr) => attr.attributeKey.key)))
);
// 2. Fetch attribute key records for these keys in this environment
const attributeKeys = await prisma.contactAttributeKey.findMany({
where: {
key: { in: keys },
environmentId,
},
});
const attributeKeyMap = attributeKeys.reduce<Record<string, string>>((acc, keyObj) => {
acc[keyObj.key] = keyObj.id;
return acc;
}, {});
// 2a. Check for missing attribute keys and create them if needed.
const missingKeysMap = new Map<string, { key: string; name: string }>();
filteredContacts.forEach((contact) => {
contact.attributes.forEach((attr) => {
// If the attribute key from the payload is not found, add it.
if (!attributeKeyMap[attr.attributeKey.key]) {
missingKeysMap.set(attr.attributeKey.key, attr.attributeKey);
}
});
});
if (missingKeysMap.size > 0) {
const missingKeysArray = Array.from(missingKeysMap.values());
// Create missing attribute keys in a batch
await prisma.contactAttributeKey.createMany({
data: missingKeysArray.map((keyObj) => ({
key: keyObj.key,
name: keyObj.name,
environmentId,
})),
skipDuplicates: true,
});
// Refresh the attribute key map for the missing keys
const newAttributeKeys = await prisma.contactAttributeKey.findMany({
where: {
key: { in: missingKeysArray.map((k) => k.key) },
environmentId,
},
select: { key: true, id: true },
});
newAttributeKeys.forEach((attrKey) => {
attributeKeyMap[attrKey.key] = attrKey.id;
});
}
// 3. Find existing contacts by matching email attribute
const existingContacts = await prisma.contact.findMany({
where: {
environmentId,
attributes: {
some: {
attributeKey: { key: emailKey },
value: { in: parsedEmails.data },
},
},
},
select: {
attributes: {
select: {
attributeKey: { select: { key: true } },
createdAt: true,
id: true,
value: true,
},
},
id: true,
},
});
// Build a map from email to contact id (if the email attribute exists)
const contactMap = new Map<
string,
{ contactId: string; attributes: { id: string; attributeKey: { key: string }; createdAt: Date }[] }
>();
existingContacts.forEach((contact) => {
const emailAttr = contact.attributes.find((attr) => attr.attributeKey.key === emailKey);
if (emailAttr) {
contactMap.set(emailAttr.value, {
contactId: contact.id,
attributes: contact.attributes.map((attr) => ({
id: attr.id,
attributeKey: { key: attr.attributeKey.key },
createdAt: attr.createdAt,
})),
});
}
});
// 4. Split contacts into ones to update and ones to create
const contactsToUpdate: {
contactId: string;
attributes: {
id: string;
createdAt: Date;
value: string;
attributeKey: {
key: string;
};
}[];
}[] = [];
const contactsToCreate: {
attributes: {
value: string;
attributeKey: {
key: string;
};
}[];
}[] = [];
for (const contact of filteredContacts) {
const emailAttr = contact.attributes.find((attr) => attr.attributeKey.key === emailKey);
if (emailAttr && contactMap.has(emailAttr.value)) {
contactsToUpdate.push({
contactId: contactMap.get(emailAttr.value)!.contactId,
attributes: contact.attributes.map((attr) => {
const existingAttr = contactMap
.get(emailAttr.value)!
.attributes.find((a) => a.attributeKey.key === attr.attributeKey.key);
if (!existingAttr) {
// Should never happen, just to be safe and satisfy typescript
return {
id: createId(),
createdAt: new Date(),
value: attr.value,
attributeKey: attr.attributeKey,
};
}
return {
id: existingAttr.id,
createdAt: existingAttr.createdAt,
value: attr.value,
attributeKey: attr.attributeKey,
};
}),
});
} else {
contactsToCreate.push(contact);
}
}
// 5. Execute in a transaction
await prisma.$transaction(async (tx) => {
// Create new contacts (one-by-one with nested writes)
// Note: prisma.contact.createMany does not support nested writes.
// for (const contact of contactsToCreate) {
// await tx.contact.create({
// data: {
// environmentId,
// // You can set other Contact fields here if needed
// attributes: {
// create: contact.attributes.map((attr) => ({
// value: attr.value,
// // Connect to an existing attributeKey via its id
// attributeKey: { connect: { id: attributeKeyMap[attr.attributeKey.key] } },
// })),
// },
// },
// });
// }
// if (contactsToCreate.length > 0) {
const newContacts = contactsToCreate.map(() => ({
id: createId(),
environmentId,
}));
await tx.contact.createMany({
data: newContacts,
});
// Build attribute records using the same pre-generated contact id
const attributesToCreate = contactsToCreate.flatMap((contact, idx) =>
contact.attributes.map((attr) => ({
id: createId(), // generate id for attribute
contactId: newContacts[idx].id, // use your pre-computed id
attributeKeyId: attributeKeyMap[attr.attributeKey.key],
value: attr.value,
createdAt: new Date(),
updatedAt: new Date(),
}))
);
const attributesToUpdate = contactsToUpdate.flatMap((contact) =>
contact.attributes.map((attr) => ({
id: attr.id,
contactId: contact.contactId,
attributeKeyId: attributeKeyMap[attr.attributeKey.key],
value: attr.value,
createdAt: attr.createdAt,
updatedAt: new Date(),
}))
);
const attributesToUpsert = [...attributesToCreate, ...attributesToUpdate];
// Use a raw query to perform a bulk insert with an ON CONFLICT clause if you need upsert logic.
await tx.$executeRaw`
INSERT INTO "ContactAttribute" (
"id", "created_at", "updated_at", "contactId", "value", "attributeKeyId"
)
SELECT
unnest(${Prisma.sql`ARRAY[${attributesToUpsert.map((a) => a.id)}]`}),
unnest(${Prisma.sql`ARRAY[${attributesToUpsert.map((a) => a.createdAt)}]`}),
unnest(${Prisma.sql`ARRAY[${attributesToUpsert.map((a) => a.updatedAt)}]`}),
unnest(${Prisma.sql`ARRAY[${attributesToUpsert.map((a) => a.contactId)}]`}),
unnest(${Prisma.sql`ARRAY[${attributesToUpsert.map((a) => a.value)}]`}),
unnest(${Prisma.sql`ARRAY[${attributesToUpsert.map((a) => a.attributeKeyId)}]`})
ON CONFLICT ("contactId", "attributeKeyId") DO UPDATE SET
"value" = EXCLUDED."value",
"updated_at" = EXCLUDED."updated_at"
`;
// }
// For contacts that exist, upsert each attribute individually.
// This leverages the unique constraint on (contactId, attributeKeyId).
// for (const contact of contactsToUpdate) {
// for (const attr of contact.attributes) {
// await tx.contactAttribute.upsert({
// where: {
// contactId_attributeKeyId: {
// contactId: contact.contactId,
// attributeKeyId: attributeKeyMap[attr.attributeKey.key],
// },
// },
// update: {
// value: attr.value,
// },
// create: {
// contactId: contact.contactId,
// attributeKeyId: attributeKeyMap[attr.attributeKey.key],
// value: attr.value,
// },
// });
// }
// }
// if (contactsToUpdate.length > 0) {
// // Build an array for update attributes from contactsToUpdate
// const updateAttributes = contactsToUpdate.flatMap((contact) =>
// contact.attributes.map((attr) => ({
// id: createId(), // Generate a new id for insertion (will be ignored if a conflict occurs)
// contactId: contact.contactId,
// attributeKeyId: attributeKeyMap[attr.attributeKey.key],
// value: attr.value,
// createdAt: new Date(),
// updatedAt: new Date(),
// }))
// );
// // Execute a single raw SQL query to upsert these attributes in bulk
// await tx.$executeRaw`
// INSERT INTO "ContactAttribute" (
// "id", "created_at", "updated_at", "contactId", "value", "attributeKeyId"
// )
// SELECT
// unnest(${Prisma.sql`ARRAY[${updateAttributes.map((a) => a.id)}]`}),
// unnest(${Prisma.sql`ARRAY[${updateAttributes.map((a) => a.createdAt)}]`}),
// unnest(${Prisma.sql`ARRAY[${updateAttributes.map((a) => a.updatedAt)}]`}),
// unnest(${Prisma.sql`ARRAY[${updateAttributes.map((a) => a.contactId)}]`}),
// unnest(${Prisma.sql`ARRAY[${updateAttributes.map((a) => a.value)}]`}),
// unnest(${Prisma.sql`ARRAY[${updateAttributes.map((a) => a.attributeKeyId)}]`})
// ON CONFLICT ("contactId", "attributeKeyId") DO UPDATE SET
// "value" = EXCLUDED."value",
// "updated_at" = EXCLUDED."updated_at"
// `;
// }
});
await upsertBulkContacts(filteredContacts, environmentId, parsedEmails.data);
return responses.successResponse({
data: {

View File

@@ -108,21 +108,27 @@ export const ZContactCSVAttributeMap = z.record(z.string(), z.string()).superRef
});
export type TContactCSVAttributeMap = z.infer<typeof ZContactCSVAttributeMap>;
export const ZContactBulkUploadAttributeKey = z.object({
key: z.string(),
name: z.string(),
});
export type TContactBulkUploadAttributeKey = z.infer<typeof ZContactBulkUploadAttributeKey>;
export const ZContactBulkUploadAttribute = z.object({
attributeKey: ZContactBulkUploadAttributeKey,
value: z.string(),
});
export const ZContactBulkUploadContact = z.object({
attributes: z.array(ZContactBulkUploadAttribute),
});
export type TContactBulkUploadContact = z.infer<typeof ZContactBulkUploadContact>;
export const ZContactBulkUploadRequest = z.object({
contacts: z
.array(
z.object({
attributes: z.array(
z.object({
attributeKey: z.object({
key: z.string(),
name: z.string(),
}),
value: z.string(),
})
),
})
)
.array(ZContactBulkUploadContact)
.max(1000, { message: "Maximum 1000 contacts allowed at a time." })
.superRefine((contacts, ctx) => {
// every contact must have an email attribute

View File

@@ -237,6 +237,7 @@
"maximum": "Maximal",
"member": "Mitglied",
"members": "Mitglieder",
"membership_not_found": "Mitgliedschaft nicht gefunden",
"metadata": "Metadaten",
"minimum": "Minimum",
"mobile_overlay_text": "Formbricks ist für Geräte mit kleineren Auflösungen nicht verfügbar.",
@@ -292,7 +293,6 @@
"privacy": "Datenschutz",
"privacy_policy": "Datenschutzerklärung",
"product_manager": "Produktmanager",
"product_not_found": "Produkt nicht gefunden",
"profile": "Profil",
"project": "Projekt",
"project_configuration": "Projektkonfiguration",

View File

@@ -237,6 +237,7 @@
"maximum": "Maximum",
"member": "Member",
"members": "Members",
"membership_not_found": "Membership not found",
"metadata": "Metadata",
"minimum": "Minimum",
"mobile_overlay_text": "Formbricks is not available for devices with smaller resolutions.",
@@ -292,7 +293,6 @@
"privacy": "Privacy Policy",
"privacy_policy": "Privacy Policy",
"product_manager": "Product Manager",
"product_not_found": "Product not found",
"profile": "Profile",
"project": "Project",
"project_configuration": "Project's Configuration",

View File

@@ -237,6 +237,7 @@
"maximum": "Max",
"member": "Membre",
"members": "Membres",
"membership_not_found": "Abonnement non trouvé",
"metadata": "Métadonnées",
"minimum": "Min",
"mobile_overlay_text": "Formbricks n'est pas disponible pour les appareils avec des résolutions plus petites.",
@@ -292,7 +293,6 @@
"privacy": "Politique de confidentialité",
"privacy_policy": "Politique de confidentialité",
"product_manager": "Chef de produit",
"product_not_found": "Produit non trouvé",
"profile": "Profil",
"project": "Projet",
"project_configuration": "Configuration du projet",

View File

@@ -237,6 +237,7 @@
"maximum": "Máximo",
"member": "Membros",
"members": "Membros",
"membership_not_found": "Assinatura não encontrada",
"metadata": "metadados",
"minimum": "Mínimo",
"mobile_overlay_text": "O Formbricks não está disponível para dispositivos com resoluções menores.",
@@ -292,7 +293,6 @@
"privacy": "Política de Privacidade",
"privacy_policy": "Política de Privacidade",
"product_manager": "Gerente de Produto",
"product_not_found": "Produto não encontrado",
"profile": "Perfil",
"project": "Projeto",
"project_configuration": "Configuração do Projeto",

View File

@@ -237,6 +237,7 @@
"maximum": "Máximo",
"member": "Membro",
"members": "Membros",
"membership_not_found": "Associação não encontrada",
"metadata": "Metadados",
"minimum": "Mínimo",
"mobile_overlay_text": "O Formbricks não está disponível para dispositivos com resoluções menores.",
@@ -292,7 +293,6 @@
"privacy": "Política de Privacidade",
"privacy_policy": "Política de Privacidade",
"product_manager": "Gestor de Produto",
"product_not_found": "Produto não encontrado",
"profile": "Perfil",
"project": "Projeto",
"project_configuration": "Configuração do Projeto",

View File

@@ -237,6 +237,7 @@
"maximum": "最大值",
"member": "成員",
"members": "成員",
"membership_not_found": "找不到成員資格",
"metadata": "元數據",
"minimum": "最小值",
"mobile_overlay_text": "Formbricks 不適用於較小解析度的裝置。",
@@ -292,7 +293,6 @@
"privacy": "隱私權政策",
"privacy_policy": "隱私權政策",
"product_manager": "產品經理",
"product_not_found": "找不到產品",
"profile": "個人資料",
"project": "專案",
"project_configuration": "專案組態",