mirror of
https://github.com/formbricks/formbricks.git
synced 2026-01-06 05:40:02 -06:00
Compare commits
2 Commits
better-ee-
...
fix/bulk-c
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9c0306c27b | ||
|
|
62960686a1 |
@@ -229,51 +229,52 @@ export const upsertBulkContacts = async (
|
|||||||
|
|
||||||
try {
|
try {
|
||||||
// Execute everything in ONE transaction
|
// Execute everything in ONE transaction
|
||||||
await prisma.$transaction(async (tx) => {
|
await prisma.$transaction(
|
||||||
const attributeKeyMap = existingAttributeKeys.reduce<Record<string, string>>((acc, keyObj) => {
|
async (tx) => {
|
||||||
acc[keyObj.key] = keyObj.id;
|
const attributeKeyMap = existingAttributeKeys.reduce<Record<string, string>>((acc, keyObj) => {
|
||||||
return acc;
|
acc[keyObj.key] = keyObj.id;
|
||||||
}, {});
|
return acc;
|
||||||
|
}, {});
|
||||||
|
|
||||||
// Check for missing attribute keys and create them if needed.
|
// Check for missing attribute keys and create them if needed.
|
||||||
const missingKeysMap = new Map<string, { key: string; name: string }>();
|
const missingKeysMap = new Map<string, { key: string; name: string }>();
|
||||||
const attributeKeyNameUpdates = new Map<string, { key: string; name: string }>();
|
const attributeKeyNameUpdates = new Map<string, { key: string; name: string }>();
|
||||||
|
|
||||||
for (const contact of filteredContacts) {
|
for (const contact of filteredContacts) {
|
||||||
for (const attr of contact.attributes) {
|
for (const attr of contact.attributes) {
|
||||||
if (!attributeKeyMap[attr.attributeKey.key]) {
|
if (!attributeKeyMap[attr.attributeKey.key]) {
|
||||||
missingKeysMap.set(attr.attributeKey.key, attr.attributeKey);
|
missingKeysMap.set(attr.attributeKey.key, attr.attributeKey);
|
||||||
} else {
|
} else {
|
||||||
// Check if the name has changed for existing attribute keys
|
// Check if the name has changed for existing attribute keys
|
||||||
const existingKey = existingAttributeKeys.find((ak) => ak.key === attr.attributeKey.key);
|
const existingKey = existingAttributeKeys.find((ak) => ak.key === attr.attributeKey.key);
|
||||||
if (existingKey && existingKey.name !== attr.attributeKey.name) {
|
if (existingKey && existingKey.name !== attr.attributeKey.name) {
|
||||||
attributeKeyNameUpdates.set(attr.attributeKey.key, attr.attributeKey);
|
attributeKeyNameUpdates.set(attr.attributeKey.key, attr.attributeKey);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
// Handle both missing keys and name updates in a single batch operation
|
// Handle both missing keys and name updates in a single batch operation
|
||||||
const keysToUpsert = new Map<string, { key: string; name: string }>();
|
const keysToUpsert = new Map<string, { key: string; name: string }>();
|
||||||
|
|
||||||
// Collect all keys that need to be created or updated
|
// Collect all keys that need to be created or updated
|
||||||
for (const [key, value] of missingKeysMap) {
|
for (const [key, value] of missingKeysMap) {
|
||||||
keysToUpsert.set(key, value);
|
keysToUpsert.set(key, value);
|
||||||
}
|
}
|
||||||
|
|
||||||
for (const [key, value] of attributeKeyNameUpdates) {
|
for (const [key, value] of attributeKeyNameUpdates) {
|
||||||
keysToUpsert.set(key, value);
|
keysToUpsert.set(key, value);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (keysToUpsert.size > 0) {
|
if (keysToUpsert.size > 0) {
|
||||||
const keysArray = Array.from(keysToUpsert.values());
|
const keysArray = Array.from(keysToUpsert.values());
|
||||||
const BATCH_SIZE = 10000;
|
const BATCH_SIZE = 10000;
|
||||||
|
|
||||||
for (let i = 0; i < keysArray.length; i += BATCH_SIZE) {
|
for (let i = 0; i < keysArray.length; i += BATCH_SIZE) {
|
||||||
const batch = keysArray.slice(i, i + BATCH_SIZE);
|
const batch = keysArray.slice(i, i + BATCH_SIZE);
|
||||||
|
|
||||||
// Use raw query to perform upsert
|
// Use raw query to perform upsert
|
||||||
const upsertedKeys = await tx.$queryRaw<{ id: string; key: string }[]>`
|
const upsertedKeys = await tx.$queryRaw<{ id: string; key: string }[]>`
|
||||||
INSERT INTO "ContactAttributeKey" ("id", "key", "name", "environmentId", "created_at", "updated_at")
|
INSERT INTO "ContactAttributeKey" ("id", "key", "name", "environmentId", "created_at", "updated_at")
|
||||||
SELECT
|
SELECT
|
||||||
unnest(${Prisma.sql`ARRAY[${batch.map(() => createId())}]`}),
|
unnest(${Prisma.sql`ARRAY[${batch.map(() => createId())}]`}),
|
||||||
@@ -289,59 +290,59 @@ export const upsertBulkContacts = async (
|
|||||||
RETURNING "id", "key"
|
RETURNING "id", "key"
|
||||||
`;
|
`;
|
||||||
|
|
||||||
// Update attribute key map with upserted keys
|
// Update attribute key map with upserted keys
|
||||||
for (const key of upsertedKeys) {
|
for (const key of upsertedKeys) {
|
||||||
attributeKeyMap[key.key] = key.id;
|
attributeKeyMap[key.key] = key.id;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
// Create new contacts -- should be at most 1000, no need to batch
|
// Create new contacts -- should be at most 1000, no need to batch
|
||||||
const newContacts = contactsToCreate.map(() => ({
|
const newContacts = contactsToCreate.map(() => ({
|
||||||
id: createId(),
|
|
||||||
environmentId,
|
|
||||||
}));
|
|
||||||
|
|
||||||
if (newContacts.length > 0) {
|
|
||||||
await tx.contact.createMany({
|
|
||||||
data: newContacts,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
// Prepare attributes for both new and existing contacts
|
|
||||||
const attributesUpsertForCreatedUsers = contactsToCreate.flatMap((contact, idx) =>
|
|
||||||
contact.attributes.map((attr) => ({
|
|
||||||
id: createId(),
|
id: createId(),
|
||||||
contactId: newContacts[idx].id,
|
environmentId,
|
||||||
attributeKeyId: attributeKeyMap[attr.attributeKey.key],
|
}));
|
||||||
value: attr.value,
|
|
||||||
createdAt: new Date(),
|
|
||||||
updatedAt: new Date(),
|
|
||||||
}))
|
|
||||||
);
|
|
||||||
|
|
||||||
const attributesUpsertForExistingUsers = contactsToUpdate.flatMap((contact) =>
|
if (newContacts.length > 0) {
|
||||||
contact.attributes.map((attr) => ({
|
await tx.contact.createMany({
|
||||||
id: attr.id,
|
data: newContacts,
|
||||||
contactId: contact.contactId,
|
});
|
||||||
attributeKeyId: attributeKeyMap[attr.attributeKey.key],
|
}
|
||||||
value: attr.value,
|
|
||||||
createdAt: attr.createdAt,
|
|
||||||
updatedAt: new Date(),
|
|
||||||
}))
|
|
||||||
);
|
|
||||||
|
|
||||||
const attributesToUpsert = [...attributesUpsertForCreatedUsers, ...attributesUpsertForExistingUsers];
|
// Prepare attributes for both new and existing contacts
|
||||||
|
const attributesUpsertForCreatedUsers = contactsToCreate.flatMap((contact, idx) =>
|
||||||
|
contact.attributes.map((attr) => ({
|
||||||
|
id: createId(),
|
||||||
|
contactId: newContacts[idx].id,
|
||||||
|
attributeKeyId: attributeKeyMap[attr.attributeKey.key],
|
||||||
|
value: attr.value,
|
||||||
|
createdAt: new Date(),
|
||||||
|
updatedAt: new Date(),
|
||||||
|
}))
|
||||||
|
);
|
||||||
|
|
||||||
// Skip the raw query if there are no attributes to upsert
|
const attributesUpsertForExistingUsers = contactsToUpdate.flatMap((contact) =>
|
||||||
if (attributesToUpsert.length > 0) {
|
contact.attributes.map((attr) => ({
|
||||||
// Process attributes in batches of 10,000
|
id: attr.id,
|
||||||
const BATCH_SIZE = 10000;
|
contactId: contact.contactId,
|
||||||
for (let i = 0; i < attributesToUpsert.length; i += BATCH_SIZE) {
|
attributeKeyId: attributeKeyMap[attr.attributeKey.key],
|
||||||
const batch = attributesToUpsert.slice(i, i + BATCH_SIZE);
|
value: attr.value,
|
||||||
|
createdAt: attr.createdAt,
|
||||||
|
updatedAt: new Date(),
|
||||||
|
}))
|
||||||
|
);
|
||||||
|
|
||||||
// Use a raw query to perform a bulk insert with an ON CONFLICT clause
|
const attributesToUpsert = [...attributesUpsertForCreatedUsers, ...attributesUpsertForExistingUsers];
|
||||||
await tx.$executeRaw`
|
|
||||||
|
// Skip the raw query if there are no attributes to upsert
|
||||||
|
if (attributesToUpsert.length > 0) {
|
||||||
|
// Process attributes in batches of 10,000
|
||||||
|
const BATCH_SIZE = 10000;
|
||||||
|
for (let i = 0; i < attributesToUpsert.length; i += BATCH_SIZE) {
|
||||||
|
const batch = attributesToUpsert.slice(i, i + BATCH_SIZE);
|
||||||
|
|
||||||
|
// Use a raw query to perform a bulk insert with an ON CONFLICT clause
|
||||||
|
await tx.$executeRaw`
|
||||||
INSERT INTO "ContactAttribute" (
|
INSERT INTO "ContactAttribute" (
|
||||||
"id", "created_at", "updated_at", "contactId", "value", "attributeKeyId"
|
"id", "created_at", "updated_at", "contactId", "value", "attributeKeyId"
|
||||||
)
|
)
|
||||||
@@ -356,33 +357,35 @@ export const upsertBulkContacts = async (
|
|||||||
"value" = EXCLUDED."value",
|
"value" = EXCLUDED."value",
|
||||||
"updated_at" = EXCLUDED."updated_at"
|
"updated_at" = EXCLUDED."updated_at"
|
||||||
`;
|
`;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
contactCache.revalidate({
|
|
||||||
environmentId,
|
|
||||||
});
|
|
||||||
|
|
||||||
// revalidate all the new contacts:
|
|
||||||
for (const newContact of newContacts) {
|
|
||||||
contactCache.revalidate({
|
contactCache.revalidate({
|
||||||
id: newContact.id,
|
environmentId,
|
||||||
});
|
});
|
||||||
}
|
|
||||||
|
|
||||||
// revalidate all the existing contacts:
|
// revalidate all the new contacts:
|
||||||
for (const existingContact of existingContactsByEmail) {
|
for (const newContact of newContacts) {
|
||||||
contactCache.revalidate({
|
contactCache.revalidate({
|
||||||
id: existingContact.id,
|
id: newContact.id,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// revalidate all the existing contacts:
|
||||||
|
for (const existingContact of existingContactsByEmail) {
|
||||||
|
contactCache.revalidate({
|
||||||
|
id: existingContact.id,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
contactAttributeKeyCache.revalidate({
|
||||||
|
environmentId,
|
||||||
});
|
});
|
||||||
}
|
|
||||||
|
|
||||||
contactAttributeKeyCache.revalidate({
|
contactAttributeCache.revalidate({ environmentId });
|
||||||
environmentId,
|
},
|
||||||
});
|
{ timeout: 10 * 1000 } // 10 seconds timeout
|
||||||
|
);
|
||||||
contactAttributeCache.revalidate({ environmentId });
|
|
||||||
});
|
|
||||||
|
|
||||||
return ok({
|
return ok({
|
||||||
contactIdxWithConflictingUserIds,
|
contactIdxWithConflictingUserIds,
|
||||||
|
|||||||
@@ -126,7 +126,7 @@ export const ZContactBulkUploadRequest = z.object({
|
|||||||
environmentId: z.string().cuid2(),
|
environmentId: z.string().cuid2(),
|
||||||
contacts: z
|
contacts: z
|
||||||
.array(ZContactBulkUploadContact)
|
.array(ZContactBulkUploadContact)
|
||||||
.max(1000, { message: "Maximum 1000 contacts allowed at a time." })
|
.max(250, { message: "Maximum 250 contacts allowed at a time." })
|
||||||
.superRefine((contacts, ctx) => {
|
.superRefine((contacts, ctx) => {
|
||||||
// Track all data in a single pass
|
// Track all data in a single pass
|
||||||
const seenEmails = new Set<string>();
|
const seenEmails = new Set<string>();
|
||||||
|
|||||||
Reference in New Issue
Block a user