feat: migrate integration configs from questions to elements (#6906)

Co-authored-by: pandeymangg <anshuman.pandey9999@gmail.com>
This commit is contained in:
Matti Nannt
2025-11-28 18:07:58 +01:00
committed by GitHub
parent e2fd71abfd
commit c53e4f54cb
3 changed files with 534 additions and 58 deletions

View File

@@ -1,8 +1,15 @@
import { createId } from "@paralleldrive/cuid2";
import { logger } from "@formbricks/logger";
import type { MigrationScript } from "../../src/scripts/migration-runner";
import type { Block, CTAMigrationStats, SurveyRecord } from "./types";
import { migrateQuestionsSurveyToBlocks } from "./utils";
import type {
Block,
CTAMigrationStats,
IntegrationConfig,
IntegrationMigrationStats,
MigratedIntegration,
SurveyRecord,
} from "./types";
import { migrateIntegrationConfig, migrateQuestionsSurveyToBlocks } from "./utils";
export const migrateQuestionsToBlocks: MigrationScript = {
type: "data",
@@ -25,71 +32,198 @@ export const migrateQuestionsToBlocks: MigrationScript = {
if (surveys.length === 0) {
logger.info("No surveys found that need migration");
return;
}
} else {
logger.info(`Found ${surveys.length.toString()} surveys to migrate`);
logger.info(`Found ${surveys.length.toString()} surveys to migrate`);
// 2. Process each survey
const updates: { id: string; blocks: Block[] }[] = [];
// 2. Process each survey
const updates: { id: string; blocks: Block[] }[] = [];
for (const survey of surveys) {
try {
const migrated = migrateQuestionsSurveyToBlocks(survey, createId, ctaStats);
updates.push({
id: migrated.id,
blocks: migrated.blocks,
});
} catch (error) {
logger.error(error, `Failed to migrate survey ${survey.id}`);
throw new Error(
`Migration failed for survey ${survey.id}: ${error instanceof Error ? error.message : String(error)}`
);
}
}
logger.info(`Successfully processed ${updates.length.toString()} surveys`);
// 3. Update surveys individually for safety (avoids SQL injection risks with complex JSONB arrays)
let updatedCount = 0;
for (const update of updates) {
try {
// PostgreSQL requires proper array format for jsonb[]
// We need to convert the JSON array to a PostgreSQL jsonb array using array_to_json
// The trick is to use jsonb_array_elements to convert the JSON array into rows, then array_agg to collect them back
await tx.$executeRawUnsafe(
`UPDATE "Survey"
SET blocks = (
SELECT array_agg(elem)
FROM jsonb_array_elements($1::jsonb) AS elem
),
questions = '[]'::jsonb
WHERE id = $2`,
JSON.stringify(update.blocks),
update.id
);
updatedCount++;
// Log progress every 10000 surveys
if (updatedCount % 10000 === 0) {
logger.info(`Progress: ${updatedCount.toString()}/${updates.length.toString()} surveys updated`);
for (const survey of surveys) {
try {
const migrated = migrateQuestionsSurveyToBlocks(survey, createId, ctaStats);
updates.push({
id: migrated.id,
blocks: migrated.blocks,
});
} catch (error) {
logger.error(error, `Failed to migrate survey ${survey.id}`);
throw new Error(
`Migration failed for survey ${survey.id}: ${error instanceof Error ? error.message : String(error)}`
);
}
} catch (error) {
logger.error(error, `Failed to update survey ${update.id} in database`);
throw new Error(
`Database update failed for survey ${update.id}: ${error instanceof Error ? error.message : String(error)}`
}
logger.info(`Successfully processed ${updates.length.toString()} surveys`);
// 3. Update surveys in batches using UNNEST for performance
// Batch size of 150 balances performance with query size safety (~7.5MB per batch)
const SURVEY_BATCH_SIZE = 150;
let updatedCount = 0;
for (let i = 0; i < updates.length; i += SURVEY_BATCH_SIZE) {
const batch = updates.slice(i, i + SURVEY_BATCH_SIZE);
try {
// Build arrays for batch update
const ids = batch.map((u) => u.id);
const blocksJsonStrings = batch.map((u) => JSON.stringify(u.blocks));
// Use UNNEST to update multiple surveys in a single query
await tx.$executeRawUnsafe(
`UPDATE "Survey" AS s
SET
blocks = (
SELECT array_agg(elem)
FROM jsonb_array_elements(data.blocks_json::jsonb) AS elem
),
questions = '[]'::jsonb
FROM (
SELECT
unnest($1::text[]) AS id,
unnest($2::text[]) AS blocks_json
) AS data
WHERE s.id = data.id`,
ids,
blocksJsonStrings
);
updatedCount += batch.length;
// Log progress
logger.info(`Progress: ${updatedCount.toString()}/${updates.length.toString()} surveys updated`);
} catch (error) {
logger.error(error, `Failed to update survey batch starting at index ${i.toString()}`);
throw new Error(
`Database batch update failed at index ${i.toString()}: ${error instanceof Error ? error.message : String(error)}`
);
}
}
logger.info(`Migration complete: ${updatedCount.toString()} surveys migrated to blocks`);
// 4. Log CTA migration statistics
if (ctaStats.totalCTAElements > 0) {
logger.info(
`CTA elements processed: ${ctaStats.totalCTAElements.toString()} total (${ctaStats.ctaWithExternalLink.toString()} with external link, ${ctaStats.ctaWithoutExternalLink.toString()} without)`
);
}
}
logger.info(`Migration complete: ${updatedCount.toString()} surveys migrated to blocks`);
// 5. Migrate Integration configs
logger.info("Starting integration config migration");
// Initialize integration statistics
const integrationStats: IntegrationMigrationStats = {
totalIntegrations: 0,
googleSheets: { processed: 0, skipped: 0 },
airtable: { processed: 0, skipped: 0 },
slack: { processed: 0, skipped: 0 },
notion: { processed: 0, skipped: 0 },
n8n: { skipped: 0 },
errors: 0,
};
// Query all integrations
const integrations = await tx.$queryRaw<{ id: string; type: string; config: IntegrationConfig }[]>`
SELECT id, type, config
FROM "Integration"
`;
integrationStats.totalIntegrations = integrations.length;
if (integrations.length === 0) {
logger.info("No integrations found to migrate");
} else {
logger.info(`Found ${integrations.length.toString()} integrations to process`);
// Process integrations in memory
const integrationUpdates: MigratedIntegration[] = [];
for (const integration of integrations) {
try {
// Config is JSON from database - cast to IntegrationConfig for runtime processing
const result = migrateIntegrationConfig(integration.type, integration.config);
// Track statistics
const typeStats = integrationStats[integration.type as keyof typeof integrationStats];
if (typeStats && typeof typeStats === "object" && "processed" in typeStats) {
if (result.migrated) {
typeStats.processed++;
integrationUpdates.push({
id: integration.id,
config: result.config,
});
} else {
typeStats.skipped++;
}
} else if (integration.type === "n8n") {
integrationStats.n8n.skipped++;
}
} catch (error) {
integrationStats.errors++;
logger.error(error, `Failed to migrate integration ${integration.id} (type: ${integration.type})`);
throw new Error(
`Migration failed for integration ${integration.id}: ${error instanceof Error ? error.message : String(error)}`
);
}
}
// 4. Log CTA migration statistics
if (ctaStats.totalCTAElements > 0) {
logger.info(
`CTA elements processed: ${ctaStats.totalCTAElements.toString()} total (${ctaStats.ctaWithExternalLink.toString()} with external link, ${ctaStats.ctaWithoutExternalLink.toString()} without)`
`Processed ${integrations.length.toString()} integrations: ${integrationUpdates.length.toString()} to update, ${(integrations.length - integrationUpdates.length).toString()} skipped`
);
// Update integrations using Promise.all for better throughput
if (integrationUpdates.length > 0) {
// Batch size of 150 provides good parallelization (~750KB per batch)
const INTEGRATION_BATCH_SIZE = 150;
let integrationUpdatedCount = 0;
for (let i = 0; i < integrationUpdates.length; i += INTEGRATION_BATCH_SIZE) {
const batch = integrationUpdates.slice(i, i + INTEGRATION_BATCH_SIZE);
try {
// Execute all updates in parallel for this batch
await Promise.all(
batch.map((update) =>
tx.$executeRawUnsafe(
`UPDATE "Integration"
SET config = $1::jsonb
WHERE id = $2`,
JSON.stringify(update.config),
update.id
)
)
);
integrationUpdatedCount += batch.length;
// Log progress
logger.info(
`Integration progress: ${integrationUpdatedCount.toString()}/${integrationUpdates.length.toString()} updated`
);
} catch (error) {
logger.error(error, `Failed to update integration batch starting at index ${i.toString()}`);
throw new Error(
`Database update failed for integration batch at index ${i.toString()}: ${error instanceof Error ? error.message : String(error)}`
);
}
}
logger.info(
`Integration migration complete: ${integrationUpdatedCount.toString()} integrations updated`
);
} else {
logger.info("No integrations needed updating (all already migrated or skipped)");
}
// Log detailed statistics
logger.info(
`Integration statistics: ` +
`GoogleSheets: ${integrationStats.googleSheets.processed.toString()} migrated, ${integrationStats.googleSheets.skipped.toString()} skipped | ` +
`Airtable: ${integrationStats.airtable.processed.toString()} migrated, ${integrationStats.airtable.skipped.toString()} skipped | ` +
`Slack: ${integrationStats.slack.processed.toString()} migrated, ${integrationStats.slack.skipped.toString()} skipped | ` +
`Notion: ${integrationStats.notion.processed.toString()} migrated, ${integrationStats.notion.skipped.toString()} skipped | ` +
`n8n: ${integrationStats.n8n.skipped.toString()} skipped`
);
}

View File

@@ -76,6 +76,151 @@ export interface CTAMigrationStats {
ctaWithoutExternalLink: number;
}
// Base integration config data (shared between all integrations except Notion)
// This represents both old (questionIds/questions) and new (elementIds/elements) formats
export interface IntegrationBaseSurveyData {
createdAt: Date;
surveyId: string;
surveyName: string;
// Old format fields
questionIds?: string[];
questions?: string;
// New format fields
elementIds?: string[];
elements?: string;
// Optional fields
includeVariables?: boolean;
includeHiddenFields?: boolean;
includeMetadata?: boolean;
includeCreatedAt?: boolean;
}
// Google Sheets specific config
export interface GoogleSheetsConfigData extends IntegrationBaseSurveyData {
spreadsheetId: string;
spreadsheetName: string;
}
export interface GoogleSheetsConfig {
key: {
token_type: "Bearer";
access_token: string;
scope: string;
expiry_date: number;
refresh_token: string;
};
data: GoogleSheetsConfigData[];
email: string;
}
// Airtable specific config
export interface AirtableConfigData extends IntegrationBaseSurveyData {
tableId: string;
baseId: string;
tableName: string;
}
export interface AirtableConfig {
key: {
expiry_date: string;
access_token: string;
refresh_token: string;
};
data: AirtableConfigData[];
email: string;
}
// Slack specific config
export interface SlackConfigData extends IntegrationBaseSurveyData {
channelId: string;
channelName: string;
}
export interface SlackConfig {
key: {
app_id: string;
authed_user: { id: string };
token_type: "bot";
access_token: string;
bot_user_id: string;
team: { id: string; name: string };
};
data: SlackConfigData[];
}
// Notion specific config (different structure - uses mapping instead of elementIds/elements)
export interface NotionMappingItem {
// Old format
question?: { id: string; name: string; type: string };
// New format
element?: { id: string; name: string; type: string };
column: { id: string; name: string; type: string };
}
export interface NotionConfigData {
createdAt: Date;
surveyId: string;
surveyName: string;
mapping: NotionMappingItem[];
databaseId: string;
databaseName: string;
}
export interface NotionConfig {
key: {
access_token: string;
bot_id: string;
token_type: string;
duplicated_template_id: string | null;
owner: {
type: string;
workspace?: boolean | null;
user: {
id: string;
name?: string | null;
type?: string | null;
object: string;
person?: { email: string } | null;
avatar_url?: string | null;
} | null;
};
workspace_icon: string | null;
workspace_id: string;
workspace_name: string | null;
};
data: NotionConfigData[];
}
// Union type for all integration configs
export type IntegrationConfig =
| GoogleSheetsConfig
| AirtableConfig
| SlackConfig
| NotionConfig
| Record<string, unknown>;
// Integration migration types
export interface IntegrationRecord {
id: string;
type: string;
config: IntegrationConfig;
}
export interface MigratedIntegration {
id: string;
config: IntegrationConfig;
}
export interface IntegrationMigrationStats {
totalIntegrations: number;
googleSheets: { processed: number; skipped: number };
airtable: { processed: number; skipped: number };
slack: { processed: number; skipped: number };
notion: { processed: number; skipped: number };
n8n: { skipped: number };
errors: number;
}
// Type guards
export const isSingleCondition = (condition: Condition): condition is SingleCondition => {
return "leftOperand" in condition && "operator" in condition;

View File

@@ -3,8 +3,10 @@ import {
type CTAMigrationStats,
type Condition,
type ConditionGroup,
type IntegrationConfig,
type LogicAction,
type MigratedSurvey,
type NotionConfig,
type SingleCondition,
type SurveyLogic,
type SurveyQuestion,
@@ -414,3 +416,198 @@ export const migrateQuestionsSurveyToBlocks = (
blocks,
};
};
// Type guard for config items with data array
interface ConfigWithData {
data: Record<string, unknown>[];
[key: string]: unknown;
}
const hasDataArray = (config: unknown): config is ConfigWithData => {
return (
typeof config === "object" &&
config !== null &&
"data" in config &&
Array.isArray((config as ConfigWithData).data)
);
};
/**
* Check if config item is already migrated (has elementIds/elements)
*/
const isAlreadyMigrated = (item: Record<string, unknown>): boolean => {
return "elementIds" in item || "elements" in item;
};
/**
* Check if config item needs migration (has questionIds/questions)
*/
const needsMigration = (item: Record<string, unknown>): boolean => {
return "questionIds" in item || "questions" in item;
};
/**
* Migrate Airtable/Google Sheets/Slack config (shared base type)
* Returns an object with migrated flag and updated config
*/
export const migrateSharedIntegrationConfig = (
config: IntegrationConfig
): { migrated: boolean; config: IntegrationConfig } => {
// Validate config structure
if (!hasDataArray(config)) {
return { migrated: false, config };
}
let anyMigrated = false;
const newData = config.data.map((item) => {
// Skip if already migrated
if (isAlreadyMigrated(item)) {
return item;
}
// Skip if nothing to migrate
if (!needsMigration(item)) {
return item;
}
anyMigrated = true;
const migrated: Record<string, unknown> = { ...item };
// Rename questionIds to elementIds
if ("questionIds" in migrated) {
migrated.elementIds = migrated.questionIds;
delete migrated.questionIds;
}
// Rename questions to elements
if ("questions" in migrated) {
migrated.elements = migrated.questions;
delete migrated.questions;
}
// All other fields (includeVariables, etc.) are preserved automatically via spread
return migrated;
});
return {
migrated: anyMigrated,
config: { ...config, data: newData },
};
};
// Type guard for Notion config
const isNotionConfig = (config: unknown): config is NotionConfig => {
return (
typeof config === "object" &&
config !== null &&
"data" in config &&
Array.isArray((config as NotionConfig).data)
);
};
// Type for Notion mapping entry
interface NotionMappingEntry {
question?: { id: string; name: string; type: string };
element?: { id: string; name: string; type: string };
column: { id: string; name: string; type: string };
}
/**
* Check if Notion config item has any mapping entries that need migration
* @param mapping - Notion mapping entries
* @returns boolean
*/
const needsNotionMigration = (mapping: NotionMappingEntry[] | undefined): boolean => {
if (!mapping || !Array.isArray(mapping) || mapping.length === 0) {
return false;
}
// Check if ANY mapping item has "question" field (needs migration)
return mapping.some((mapItem) => "question" in mapItem && !("element" in mapItem));
};
/**
* Migrate Notion config (custom mapping structure)
* @param config - Notion config
* @returns \{ migrated: boolean; config: IntegrationConfig \}
*/
export const migrateNotionIntegrationConfig = (
config: IntegrationConfig
): { migrated: boolean; config: IntegrationConfig } => {
// Validate config structure
if (!isNotionConfig(config)) {
return { migrated: false, config };
}
let anyMigrated = false;
const newData = config.data.map((item) => {
// Cast mapping to the migration type that includes both old and new formats
const mapping = item.mapping as NotionMappingEntry[] | undefined;
// Skip if nothing to migrate
if (!needsNotionMigration(mapping)) {
return item;
}
anyMigrated = true;
// Migrate mapping array - check EACH item individually
const newMapping = mapping?.map((mapItem) => {
// Already has element field - skip this item
if ("element" in mapItem) {
return mapItem;
}
// Has question field - migrate it
if ("question" in mapItem) {
const { question, ...rest } = mapItem;
return {
...rest,
element: question,
};
}
// Neither element nor question - return as is
return mapItem;
});
return {
...item,
mapping: newMapping,
};
});
return {
migrated: anyMigrated,
config: { ...config, data: newData },
};
};
/**
* Migrate integration config based on type
* @param type - Integration type
* @param config - Integration config
* @returns \{ migrated: boolean; config: IntegrationConfig \}
*/
export const migrateIntegrationConfig = (
type: string,
config: IntegrationConfig
): { migrated: boolean; config: IntegrationConfig } => {
switch (type) {
case "googleSheets":
case "airtable":
case "slack":
return migrateSharedIntegrationConfig(config);
case "notion":
return migrateNotionIntegrationConfig(config);
case "n8n":
// n8n has no config schema to migrate
return { migrated: false, config };
default:
// Unknown type - return unchanged
return { migrated: false, config };
}
};