mirror of
https://github.com/formbricks/formbricks.git
synced 2025-12-23 06:30:51 -06:00
Compare commits
2 Commits
docs/formb
...
fix/bulk-c
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9c0306c27b | ||
|
|
62960686a1 |
@@ -229,51 +229,52 @@ export const upsertBulkContacts = async (
|
||||
|
||||
try {
|
||||
// Execute everything in ONE transaction
|
||||
await prisma.$transaction(async (tx) => {
|
||||
const attributeKeyMap = existingAttributeKeys.reduce<Record<string, string>>((acc, keyObj) => {
|
||||
acc[keyObj.key] = keyObj.id;
|
||||
return acc;
|
||||
}, {});
|
||||
await prisma.$transaction(
|
||||
async (tx) => {
|
||||
const attributeKeyMap = existingAttributeKeys.reduce<Record<string, string>>((acc, keyObj) => {
|
||||
acc[keyObj.key] = keyObj.id;
|
||||
return acc;
|
||||
}, {});
|
||||
|
||||
// Check for missing attribute keys and create them if needed.
|
||||
const missingKeysMap = new Map<string, { key: string; name: string }>();
|
||||
const attributeKeyNameUpdates = new Map<string, { key: string; name: string }>();
|
||||
// Check for missing attribute keys and create them if needed.
|
||||
const missingKeysMap = new Map<string, { key: string; name: string }>();
|
||||
const attributeKeyNameUpdates = new Map<string, { key: string; name: string }>();
|
||||
|
||||
for (const contact of filteredContacts) {
|
||||
for (const attr of contact.attributes) {
|
||||
if (!attributeKeyMap[attr.attributeKey.key]) {
|
||||
missingKeysMap.set(attr.attributeKey.key, attr.attributeKey);
|
||||
} else {
|
||||
// Check if the name has changed for existing attribute keys
|
||||
const existingKey = existingAttributeKeys.find((ak) => ak.key === attr.attributeKey.key);
|
||||
if (existingKey && existingKey.name !== attr.attributeKey.name) {
|
||||
attributeKeyNameUpdates.set(attr.attributeKey.key, attr.attributeKey);
|
||||
for (const contact of filteredContacts) {
|
||||
for (const attr of contact.attributes) {
|
||||
if (!attributeKeyMap[attr.attributeKey.key]) {
|
||||
missingKeysMap.set(attr.attributeKey.key, attr.attributeKey);
|
||||
} else {
|
||||
// Check if the name has changed for existing attribute keys
|
||||
const existingKey = existingAttributeKeys.find((ak) => ak.key === attr.attributeKey.key);
|
||||
if (existingKey && existingKey.name !== attr.attributeKey.name) {
|
||||
attributeKeyNameUpdates.set(attr.attributeKey.key, attr.attributeKey);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Handle both missing keys and name updates in a single batch operation
|
||||
const keysToUpsert = new Map<string, { key: string; name: string }>();
|
||||
// Handle both missing keys and name updates in a single batch operation
|
||||
const keysToUpsert = new Map<string, { key: string; name: string }>();
|
||||
|
||||
// Collect all keys that need to be created or updated
|
||||
for (const [key, value] of missingKeysMap) {
|
||||
keysToUpsert.set(key, value);
|
||||
}
|
||||
// Collect all keys that need to be created or updated
|
||||
for (const [key, value] of missingKeysMap) {
|
||||
keysToUpsert.set(key, value);
|
||||
}
|
||||
|
||||
for (const [key, value] of attributeKeyNameUpdates) {
|
||||
keysToUpsert.set(key, value);
|
||||
}
|
||||
for (const [key, value] of attributeKeyNameUpdates) {
|
||||
keysToUpsert.set(key, value);
|
||||
}
|
||||
|
||||
if (keysToUpsert.size > 0) {
|
||||
const keysArray = Array.from(keysToUpsert.values());
|
||||
const BATCH_SIZE = 10000;
|
||||
if (keysToUpsert.size > 0) {
|
||||
const keysArray = Array.from(keysToUpsert.values());
|
||||
const BATCH_SIZE = 10000;
|
||||
|
||||
for (let i = 0; i < keysArray.length; i += BATCH_SIZE) {
|
||||
const batch = keysArray.slice(i, i + BATCH_SIZE);
|
||||
for (let i = 0; i < keysArray.length; i += BATCH_SIZE) {
|
||||
const batch = keysArray.slice(i, i + BATCH_SIZE);
|
||||
|
||||
// Use raw query to perform upsert
|
||||
const upsertedKeys = await tx.$queryRaw<{ id: string; key: string }[]>`
|
||||
// Use raw query to perform upsert
|
||||
const upsertedKeys = await tx.$queryRaw<{ id: string; key: string }[]>`
|
||||
INSERT INTO "ContactAttributeKey" ("id", "key", "name", "environmentId", "created_at", "updated_at")
|
||||
SELECT
|
||||
unnest(${Prisma.sql`ARRAY[${batch.map(() => createId())}]`}),
|
||||
@@ -289,59 +290,59 @@ export const upsertBulkContacts = async (
|
||||
RETURNING "id", "key"
|
||||
`;
|
||||
|
||||
// Update attribute key map with upserted keys
|
||||
for (const key of upsertedKeys) {
|
||||
attributeKeyMap[key.key] = key.id;
|
||||
// Update attribute key map with upserted keys
|
||||
for (const key of upsertedKeys) {
|
||||
attributeKeyMap[key.key] = key.id;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Create new contacts -- should be at most 1000, no need to batch
|
||||
const newContacts = contactsToCreate.map(() => ({
|
||||
id: createId(),
|
||||
environmentId,
|
||||
}));
|
||||
|
||||
if (newContacts.length > 0) {
|
||||
await tx.contact.createMany({
|
||||
data: newContacts,
|
||||
});
|
||||
}
|
||||
|
||||
// Prepare attributes for both new and existing contacts
|
||||
const attributesUpsertForCreatedUsers = contactsToCreate.flatMap((contact, idx) =>
|
||||
contact.attributes.map((attr) => ({
|
||||
// Create new contacts -- should be at most 1000, no need to batch
|
||||
const newContacts = contactsToCreate.map(() => ({
|
||||
id: createId(),
|
||||
contactId: newContacts[idx].id,
|
||||
attributeKeyId: attributeKeyMap[attr.attributeKey.key],
|
||||
value: attr.value,
|
||||
createdAt: new Date(),
|
||||
updatedAt: new Date(),
|
||||
}))
|
||||
);
|
||||
environmentId,
|
||||
}));
|
||||
|
||||
const attributesUpsertForExistingUsers = contactsToUpdate.flatMap((contact) =>
|
||||
contact.attributes.map((attr) => ({
|
||||
id: attr.id,
|
||||
contactId: contact.contactId,
|
||||
attributeKeyId: attributeKeyMap[attr.attributeKey.key],
|
||||
value: attr.value,
|
||||
createdAt: attr.createdAt,
|
||||
updatedAt: new Date(),
|
||||
}))
|
||||
);
|
||||
if (newContacts.length > 0) {
|
||||
await tx.contact.createMany({
|
||||
data: newContacts,
|
||||
});
|
||||
}
|
||||
|
||||
const attributesToUpsert = [...attributesUpsertForCreatedUsers, ...attributesUpsertForExistingUsers];
|
||||
// Prepare attributes for both new and existing contacts
|
||||
const attributesUpsertForCreatedUsers = contactsToCreate.flatMap((contact, idx) =>
|
||||
contact.attributes.map((attr) => ({
|
||||
id: createId(),
|
||||
contactId: newContacts[idx].id,
|
||||
attributeKeyId: attributeKeyMap[attr.attributeKey.key],
|
||||
value: attr.value,
|
||||
createdAt: new Date(),
|
||||
updatedAt: new Date(),
|
||||
}))
|
||||
);
|
||||
|
||||
// Skip the raw query if there are no attributes to upsert
|
||||
if (attributesToUpsert.length > 0) {
|
||||
// Process attributes in batches of 10,000
|
||||
const BATCH_SIZE = 10000;
|
||||
for (let i = 0; i < attributesToUpsert.length; i += BATCH_SIZE) {
|
||||
const batch = attributesToUpsert.slice(i, i + BATCH_SIZE);
|
||||
const attributesUpsertForExistingUsers = contactsToUpdate.flatMap((contact) =>
|
||||
contact.attributes.map((attr) => ({
|
||||
id: attr.id,
|
||||
contactId: contact.contactId,
|
||||
attributeKeyId: attributeKeyMap[attr.attributeKey.key],
|
||||
value: attr.value,
|
||||
createdAt: attr.createdAt,
|
||||
updatedAt: new Date(),
|
||||
}))
|
||||
);
|
||||
|
||||
// Use a raw query to perform a bulk insert with an ON CONFLICT clause
|
||||
await tx.$executeRaw`
|
||||
const attributesToUpsert = [...attributesUpsertForCreatedUsers, ...attributesUpsertForExistingUsers];
|
||||
|
||||
// Skip the raw query if there are no attributes to upsert
|
||||
if (attributesToUpsert.length > 0) {
|
||||
// Process attributes in batches of 10,000
|
||||
const BATCH_SIZE = 10000;
|
||||
for (let i = 0; i < attributesToUpsert.length; i += BATCH_SIZE) {
|
||||
const batch = attributesToUpsert.slice(i, i + BATCH_SIZE);
|
||||
|
||||
// Use a raw query to perform a bulk insert with an ON CONFLICT clause
|
||||
await tx.$executeRaw`
|
||||
INSERT INTO "ContactAttribute" (
|
||||
"id", "created_at", "updated_at", "contactId", "value", "attributeKeyId"
|
||||
)
|
||||
@@ -356,33 +357,35 @@ export const upsertBulkContacts = async (
|
||||
"value" = EXCLUDED."value",
|
||||
"updated_at" = EXCLUDED."updated_at"
|
||||
`;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
contactCache.revalidate({
|
||||
environmentId,
|
||||
});
|
||||
|
||||
// revalidate all the new contacts:
|
||||
for (const newContact of newContacts) {
|
||||
contactCache.revalidate({
|
||||
id: newContact.id,
|
||||
environmentId,
|
||||
});
|
||||
}
|
||||
|
||||
// revalidate all the existing contacts:
|
||||
for (const existingContact of existingContactsByEmail) {
|
||||
contactCache.revalidate({
|
||||
id: existingContact.id,
|
||||
// revalidate all the new contacts:
|
||||
for (const newContact of newContacts) {
|
||||
contactCache.revalidate({
|
||||
id: newContact.id,
|
||||
});
|
||||
}
|
||||
|
||||
// revalidate all the existing contacts:
|
||||
for (const existingContact of existingContactsByEmail) {
|
||||
contactCache.revalidate({
|
||||
id: existingContact.id,
|
||||
});
|
||||
}
|
||||
|
||||
contactAttributeKeyCache.revalidate({
|
||||
environmentId,
|
||||
});
|
||||
}
|
||||
|
||||
contactAttributeKeyCache.revalidate({
|
||||
environmentId,
|
||||
});
|
||||
|
||||
contactAttributeCache.revalidate({ environmentId });
|
||||
});
|
||||
contactAttributeCache.revalidate({ environmentId });
|
||||
},
|
||||
{ timeout: 10 * 1000 } // 10 seconds timeout
|
||||
);
|
||||
|
||||
return ok({
|
||||
contactIdxWithConflictingUserIds,
|
||||
|
||||
@@ -126,7 +126,7 @@ export const ZContactBulkUploadRequest = z.object({
|
||||
environmentId: z.string().cuid2(),
|
||||
contacts: z
|
||||
.array(ZContactBulkUploadContact)
|
||||
.max(1000, { message: "Maximum 1000 contacts allowed at a time." })
|
||||
.max(250, { message: "Maximum 250 contacts allowed at a time." })
|
||||
.superRefine((contacts, ctx) => {
|
||||
// Track all data in a single pass
|
||||
const seenEmails = new Set<string>();
|
||||
|
||||
Reference in New Issue
Block a user