Compare commits

...

2 Commits

Author SHA1 Message Date
pandeymangg
9c0306c27b Merge branch 'main' into fix/bulk-contacts 2025-04-18 15:03:11 +05:30
pandeymangg
62960686a1 fix: bulk contacts refactors 2025-04-08 11:32:00 +05:30
2 changed files with 103 additions and 100 deletions

View File

@@ -229,51 +229,52 @@ export const upsertBulkContacts = async (
try { try {
// Execute everything in ONE transaction // Execute everything in ONE transaction
await prisma.$transaction(async (tx) => { await prisma.$transaction(
const attributeKeyMap = existingAttributeKeys.reduce<Record<string, string>>((acc, keyObj) => { async (tx) => {
acc[keyObj.key] = keyObj.id; const attributeKeyMap = existingAttributeKeys.reduce<Record<string, string>>((acc, keyObj) => {
return acc; acc[keyObj.key] = keyObj.id;
}, {}); return acc;
}, {});
// Check for missing attribute keys and create them if needed. // Check for missing attribute keys and create them if needed.
const missingKeysMap = new Map<string, { key: string; name: string }>(); const missingKeysMap = new Map<string, { key: string; name: string }>();
const attributeKeyNameUpdates = new Map<string, { key: string; name: string }>(); const attributeKeyNameUpdates = new Map<string, { key: string; name: string }>();
for (const contact of filteredContacts) { for (const contact of filteredContacts) {
for (const attr of contact.attributes) { for (const attr of contact.attributes) {
if (!attributeKeyMap[attr.attributeKey.key]) { if (!attributeKeyMap[attr.attributeKey.key]) {
missingKeysMap.set(attr.attributeKey.key, attr.attributeKey); missingKeysMap.set(attr.attributeKey.key, attr.attributeKey);
} else { } else {
// Check if the name has changed for existing attribute keys // Check if the name has changed for existing attribute keys
const existingKey = existingAttributeKeys.find((ak) => ak.key === attr.attributeKey.key); const existingKey = existingAttributeKeys.find((ak) => ak.key === attr.attributeKey.key);
if (existingKey && existingKey.name !== attr.attributeKey.name) { if (existingKey && existingKey.name !== attr.attributeKey.name) {
attributeKeyNameUpdates.set(attr.attributeKey.key, attr.attributeKey); attributeKeyNameUpdates.set(attr.attributeKey.key, attr.attributeKey);
}
} }
} }
} }
}
// Handle both missing keys and name updates in a single batch operation // Handle both missing keys and name updates in a single batch operation
const keysToUpsert = new Map<string, { key: string; name: string }>(); const keysToUpsert = new Map<string, { key: string; name: string }>();
// Collect all keys that need to be created or updated // Collect all keys that need to be created or updated
for (const [key, value] of missingKeysMap) { for (const [key, value] of missingKeysMap) {
keysToUpsert.set(key, value); keysToUpsert.set(key, value);
} }
for (const [key, value] of attributeKeyNameUpdates) { for (const [key, value] of attributeKeyNameUpdates) {
keysToUpsert.set(key, value); keysToUpsert.set(key, value);
} }
if (keysToUpsert.size > 0) { if (keysToUpsert.size > 0) {
const keysArray = Array.from(keysToUpsert.values()); const keysArray = Array.from(keysToUpsert.values());
const BATCH_SIZE = 10000; const BATCH_SIZE = 10000;
for (let i = 0; i < keysArray.length; i += BATCH_SIZE) { for (let i = 0; i < keysArray.length; i += BATCH_SIZE) {
const batch = keysArray.slice(i, i + BATCH_SIZE); const batch = keysArray.slice(i, i + BATCH_SIZE);
// Use raw query to perform upsert // Use raw query to perform upsert
const upsertedKeys = await tx.$queryRaw<{ id: string; key: string }[]>` const upsertedKeys = await tx.$queryRaw<{ id: string; key: string }[]>`
INSERT INTO "ContactAttributeKey" ("id", "key", "name", "environmentId", "created_at", "updated_at") INSERT INTO "ContactAttributeKey" ("id", "key", "name", "environmentId", "created_at", "updated_at")
SELECT SELECT
unnest(${Prisma.sql`ARRAY[${batch.map(() => createId())}]`}), unnest(${Prisma.sql`ARRAY[${batch.map(() => createId())}]`}),
@@ -289,59 +290,59 @@ export const upsertBulkContacts = async (
RETURNING "id", "key" RETURNING "id", "key"
`; `;
// Update attribute key map with upserted keys // Update attribute key map with upserted keys
for (const key of upsertedKeys) { for (const key of upsertedKeys) {
attributeKeyMap[key.key] = key.id; attributeKeyMap[key.key] = key.id;
}
} }
} }
}
// Create new contacts -- should be at most 1000, no need to batch // Create new contacts -- should be at most 1000, no need to batch
const newContacts = contactsToCreate.map(() => ({ const newContacts = contactsToCreate.map(() => ({
id: createId(),
environmentId,
}));
if (newContacts.length > 0) {
await tx.contact.createMany({
data: newContacts,
});
}
// Prepare attributes for both new and existing contacts
const attributesUpsertForCreatedUsers = contactsToCreate.flatMap((contact, idx) =>
contact.attributes.map((attr) => ({
id: createId(), id: createId(),
contactId: newContacts[idx].id, environmentId,
attributeKeyId: attributeKeyMap[attr.attributeKey.key], }));
value: attr.value,
createdAt: new Date(),
updatedAt: new Date(),
}))
);
const attributesUpsertForExistingUsers = contactsToUpdate.flatMap((contact) => if (newContacts.length > 0) {
contact.attributes.map((attr) => ({ await tx.contact.createMany({
id: attr.id, data: newContacts,
contactId: contact.contactId, });
attributeKeyId: attributeKeyMap[attr.attributeKey.key], }
value: attr.value,
createdAt: attr.createdAt,
updatedAt: new Date(),
}))
);
const attributesToUpsert = [...attributesUpsertForCreatedUsers, ...attributesUpsertForExistingUsers]; // Prepare attributes for both new and existing contacts
const attributesUpsertForCreatedUsers = contactsToCreate.flatMap((contact, idx) =>
contact.attributes.map((attr) => ({
id: createId(),
contactId: newContacts[idx].id,
attributeKeyId: attributeKeyMap[attr.attributeKey.key],
value: attr.value,
createdAt: new Date(),
updatedAt: new Date(),
}))
);
// Skip the raw query if there are no attributes to upsert const attributesUpsertForExistingUsers = contactsToUpdate.flatMap((contact) =>
if (attributesToUpsert.length > 0) { contact.attributes.map((attr) => ({
// Process attributes in batches of 10,000 id: attr.id,
const BATCH_SIZE = 10000; contactId: contact.contactId,
for (let i = 0; i < attributesToUpsert.length; i += BATCH_SIZE) { attributeKeyId: attributeKeyMap[attr.attributeKey.key],
const batch = attributesToUpsert.slice(i, i + BATCH_SIZE); value: attr.value,
createdAt: attr.createdAt,
updatedAt: new Date(),
}))
);
// Use a raw query to perform a bulk insert with an ON CONFLICT clause const attributesToUpsert = [...attributesUpsertForCreatedUsers, ...attributesUpsertForExistingUsers];
await tx.$executeRaw`
// Skip the raw query if there are no attributes to upsert
if (attributesToUpsert.length > 0) {
// Process attributes in batches of 10,000
const BATCH_SIZE = 10000;
for (let i = 0; i < attributesToUpsert.length; i += BATCH_SIZE) {
const batch = attributesToUpsert.slice(i, i + BATCH_SIZE);
// Use a raw query to perform a bulk insert with an ON CONFLICT clause
await tx.$executeRaw`
INSERT INTO "ContactAttribute" ( INSERT INTO "ContactAttribute" (
"id", "created_at", "updated_at", "contactId", "value", "attributeKeyId" "id", "created_at", "updated_at", "contactId", "value", "attributeKeyId"
) )
@@ -356,33 +357,35 @@ export const upsertBulkContacts = async (
"value" = EXCLUDED."value", "value" = EXCLUDED."value",
"updated_at" = EXCLUDED."updated_at" "updated_at" = EXCLUDED."updated_at"
`; `;
}
} }
}
contactCache.revalidate({
environmentId,
});
// revalidate all the new contacts:
for (const newContact of newContacts) {
contactCache.revalidate({ contactCache.revalidate({
id: newContact.id, environmentId,
}); });
}
// revalidate all the existing contacts: // revalidate all the new contacts:
for (const existingContact of existingContactsByEmail) { for (const newContact of newContacts) {
contactCache.revalidate({ contactCache.revalidate({
id: existingContact.id, id: newContact.id,
});
}
// revalidate all the existing contacts:
for (const existingContact of existingContactsByEmail) {
contactCache.revalidate({
id: existingContact.id,
});
}
contactAttributeKeyCache.revalidate({
environmentId,
}); });
}
contactAttributeKeyCache.revalidate({ contactAttributeCache.revalidate({ environmentId });
environmentId, },
}); { timeout: 10 * 1000 } // 10 seconds timeout
);
contactAttributeCache.revalidate({ environmentId });
});
return ok({ return ok({
contactIdxWithConflictingUserIds, contactIdxWithConflictingUserIds,

View File

@@ -126,7 +126,7 @@ export const ZContactBulkUploadRequest = z.object({
environmentId: z.string().cuid2(), environmentId: z.string().cuid2(),
contacts: z contacts: z
.array(ZContactBulkUploadContact) .array(ZContactBulkUploadContact)
.max(1000, { message: "Maximum 1000 contacts allowed at a time." }) .max(250, { message: "Maximum 250 contacts allowed at a time." })
.superRefine((contacts, ctx) => { .superRefine((contacts, ctx) => {
// Track all data in a single pass // Track all data in a single pass
const seenEmails = new Set<string>(); const seenEmails = new Set<string>();