Compare commits

...

2 Commits

Author SHA1 Message Date
pandeymangg
9c0306c27b Merge branch 'main' into fix/bulk-contacts 2025-04-18 15:03:11 +05:30
pandeymangg
62960686a1 fix: bulk contacts refactors 2025-04-08 11:32:00 +05:30
2 changed files with 103 additions and 100 deletions

View File

@@ -229,51 +229,52 @@ export const upsertBulkContacts = async (
try {
// Execute everything in ONE transaction
await prisma.$transaction(async (tx) => {
const attributeKeyMap = existingAttributeKeys.reduce<Record<string, string>>((acc, keyObj) => {
acc[keyObj.key] = keyObj.id;
return acc;
}, {});
await prisma.$transaction(
async (tx) => {
const attributeKeyMap = existingAttributeKeys.reduce<Record<string, string>>((acc, keyObj) => {
acc[keyObj.key] = keyObj.id;
return acc;
}, {});
// Check for missing attribute keys and create them if needed.
const missingKeysMap = new Map<string, { key: string; name: string }>();
const attributeKeyNameUpdates = new Map<string, { key: string; name: string }>();
// Check for missing attribute keys and create them if needed.
const missingKeysMap = new Map<string, { key: string; name: string }>();
const attributeKeyNameUpdates = new Map<string, { key: string; name: string }>();
for (const contact of filteredContacts) {
for (const attr of contact.attributes) {
if (!attributeKeyMap[attr.attributeKey.key]) {
missingKeysMap.set(attr.attributeKey.key, attr.attributeKey);
} else {
// Check if the name has changed for existing attribute keys
const existingKey = existingAttributeKeys.find((ak) => ak.key === attr.attributeKey.key);
if (existingKey && existingKey.name !== attr.attributeKey.name) {
attributeKeyNameUpdates.set(attr.attributeKey.key, attr.attributeKey);
for (const contact of filteredContacts) {
for (const attr of contact.attributes) {
if (!attributeKeyMap[attr.attributeKey.key]) {
missingKeysMap.set(attr.attributeKey.key, attr.attributeKey);
} else {
// Check if the name has changed for existing attribute keys
const existingKey = existingAttributeKeys.find((ak) => ak.key === attr.attributeKey.key);
if (existingKey && existingKey.name !== attr.attributeKey.name) {
attributeKeyNameUpdates.set(attr.attributeKey.key, attr.attributeKey);
}
}
}
}
}
// Handle both missing keys and name updates in a single batch operation
const keysToUpsert = new Map<string, { key: string; name: string }>();
// Handle both missing keys and name updates in a single batch operation
const keysToUpsert = new Map<string, { key: string; name: string }>();
// Collect all keys that need to be created or updated
for (const [key, value] of missingKeysMap) {
keysToUpsert.set(key, value);
}
// Collect all keys that need to be created or updated
for (const [key, value] of missingKeysMap) {
keysToUpsert.set(key, value);
}
for (const [key, value] of attributeKeyNameUpdates) {
keysToUpsert.set(key, value);
}
for (const [key, value] of attributeKeyNameUpdates) {
keysToUpsert.set(key, value);
}
if (keysToUpsert.size > 0) {
const keysArray = Array.from(keysToUpsert.values());
const BATCH_SIZE = 10000;
if (keysToUpsert.size > 0) {
const keysArray = Array.from(keysToUpsert.values());
const BATCH_SIZE = 10000;
for (let i = 0; i < keysArray.length; i += BATCH_SIZE) {
const batch = keysArray.slice(i, i + BATCH_SIZE);
for (let i = 0; i < keysArray.length; i += BATCH_SIZE) {
const batch = keysArray.slice(i, i + BATCH_SIZE);
// Use raw query to perform upsert
const upsertedKeys = await tx.$queryRaw<{ id: string; key: string }[]>`
// Use raw query to perform upsert
const upsertedKeys = await tx.$queryRaw<{ id: string; key: string }[]>`
INSERT INTO "ContactAttributeKey" ("id", "key", "name", "environmentId", "created_at", "updated_at")
SELECT
unnest(${Prisma.sql`ARRAY[${batch.map(() => createId())}]`}),
@@ -289,59 +290,59 @@ export const upsertBulkContacts = async (
RETURNING "id", "key"
`;
// Update attribute key map with upserted keys
for (const key of upsertedKeys) {
attributeKeyMap[key.key] = key.id;
// Update attribute key map with upserted keys
for (const key of upsertedKeys) {
attributeKeyMap[key.key] = key.id;
}
}
}
}
// Create new contacts -- should be at most 1000, no need to batch
const newContacts = contactsToCreate.map(() => ({
id: createId(),
environmentId,
}));
if (newContacts.length > 0) {
await tx.contact.createMany({
data: newContacts,
});
}
// Prepare attributes for both new and existing contacts
const attributesUpsertForCreatedUsers = contactsToCreate.flatMap((contact, idx) =>
contact.attributes.map((attr) => ({
// Create new contacts -- should be at most 1000, no need to batch
const newContacts = contactsToCreate.map(() => ({
id: createId(),
contactId: newContacts[idx].id,
attributeKeyId: attributeKeyMap[attr.attributeKey.key],
value: attr.value,
createdAt: new Date(),
updatedAt: new Date(),
}))
);
environmentId,
}));
const attributesUpsertForExistingUsers = contactsToUpdate.flatMap((contact) =>
contact.attributes.map((attr) => ({
id: attr.id,
contactId: contact.contactId,
attributeKeyId: attributeKeyMap[attr.attributeKey.key],
value: attr.value,
createdAt: attr.createdAt,
updatedAt: new Date(),
}))
);
if (newContacts.length > 0) {
await tx.contact.createMany({
data: newContacts,
});
}
const attributesToUpsert = [...attributesUpsertForCreatedUsers, ...attributesUpsertForExistingUsers];
// Prepare attributes for both new and existing contacts
const attributesUpsertForCreatedUsers = contactsToCreate.flatMap((contact, idx) =>
contact.attributes.map((attr) => ({
id: createId(),
contactId: newContacts[idx].id,
attributeKeyId: attributeKeyMap[attr.attributeKey.key],
value: attr.value,
createdAt: new Date(),
updatedAt: new Date(),
}))
);
// Skip the raw query if there are no attributes to upsert
if (attributesToUpsert.length > 0) {
// Process attributes in batches of 10,000
const BATCH_SIZE = 10000;
for (let i = 0; i < attributesToUpsert.length; i += BATCH_SIZE) {
const batch = attributesToUpsert.slice(i, i + BATCH_SIZE);
const attributesUpsertForExistingUsers = contactsToUpdate.flatMap((contact) =>
contact.attributes.map((attr) => ({
id: attr.id,
contactId: contact.contactId,
attributeKeyId: attributeKeyMap[attr.attributeKey.key],
value: attr.value,
createdAt: attr.createdAt,
updatedAt: new Date(),
}))
);
// Use a raw query to perform a bulk insert with an ON CONFLICT clause
await tx.$executeRaw`
const attributesToUpsert = [...attributesUpsertForCreatedUsers, ...attributesUpsertForExistingUsers];
// Skip the raw query if there are no attributes to upsert
if (attributesToUpsert.length > 0) {
// Process attributes in batches of 10,000
const BATCH_SIZE = 10000;
for (let i = 0; i < attributesToUpsert.length; i += BATCH_SIZE) {
const batch = attributesToUpsert.slice(i, i + BATCH_SIZE);
// Use a raw query to perform a bulk insert with an ON CONFLICT clause
await tx.$executeRaw`
INSERT INTO "ContactAttribute" (
"id", "created_at", "updated_at", "contactId", "value", "attributeKeyId"
)
@@ -356,33 +357,35 @@ export const upsertBulkContacts = async (
"value" = EXCLUDED."value",
"updated_at" = EXCLUDED."updated_at"
`;
}
}
}
contactCache.revalidate({
environmentId,
});
// revalidate all the new contacts:
for (const newContact of newContacts) {
contactCache.revalidate({
id: newContact.id,
environmentId,
});
}
// revalidate all the existing contacts:
for (const existingContact of existingContactsByEmail) {
contactCache.revalidate({
id: existingContact.id,
// revalidate all the new contacts:
for (const newContact of newContacts) {
contactCache.revalidate({
id: newContact.id,
});
}
// revalidate all the existing contacts:
for (const existingContact of existingContactsByEmail) {
contactCache.revalidate({
id: existingContact.id,
});
}
contactAttributeKeyCache.revalidate({
environmentId,
});
}
contactAttributeKeyCache.revalidate({
environmentId,
});
contactAttributeCache.revalidate({ environmentId });
});
contactAttributeCache.revalidate({ environmentId });
},
{ timeout: 10 * 1000 } // 10 seconds timeout
);
return ok({
contactIdxWithConflictingUserIds,

View File

@@ -126,7 +126,7 @@ export const ZContactBulkUploadRequest = z.object({
environmentId: z.string().cuid2(),
contacts: z
.array(ZContactBulkUploadContact)
.max(1000, { message: "Maximum 1000 contacts allowed at a time." })
.max(250, { message: "Maximum 250 contacts allowed at a time." })
.superRefine((contacts, ctx) => {
// Track all data in a single pass
const seenEmails = new Set<string>();