From f7a28f8cb3115a76f1270421df5ba4b1fb4ec7f5 Mon Sep 17 00:00:00 2001 From: pandeymangg Date: Tue, 3 Feb 2026 22:12:37 +0530 Subject: [PATCH] feat: connectors --- .env.example | 12 +- .../workspace/unify/sources/actions.ts | 160 +++++ .../components/create-source-modal.tsx | 333 +++++++-- .../sources/components/edit-source-modal.tsx | 366 +++++++++- .../components/formbricks-survey-selector.tsx | 150 ++-- .../components/sources-page-client.tsx | 277 +++++++- .../sources/components/sources-table.tsx | 10 +- .../unify/sources/components/types.ts | 157 ++++- apps/web/app/api/(internal)/pipeline/route.ts | 9 + .../webhook-listener/[sessionId]/route.ts | 102 +++ .../api/unify/webhook/[connectorId]/route.ts | 276 ++++++++ apps/web/lib/connector/actions.ts | 454 ++++++++++++ apps/web/lib/connector/hub-client.ts | 315 +++++++++ apps/web/lib/connector/pipeline-handler.ts | 138 ++++ apps/web/lib/connector/service.ts | 663 ++++++++++++++++++ apps/web/lib/connector/transform.ts | 375 ++++++++++ .../lib/connector/webhook-listener-store.ts | 155 ++++ apps/web/lib/utils/helper.ts | 29 + apps/web/lib/utils/services.ts | 22 + .../migration.sql | 81 +++ packages/database/schema.prisma | 106 ++- packages/types/connector.ts | 235 +++++++ 22 files changed, 4243 insertions(+), 182 deletions(-) create mode 100644 apps/web/app/(app)/environments/[environmentId]/workspace/unify/sources/actions.ts create mode 100644 apps/web/app/api/unify/webhook-listener/[sessionId]/route.ts create mode 100644 apps/web/app/api/unify/webhook/[connectorId]/route.ts create mode 100644 apps/web/lib/connector/actions.ts create mode 100644 apps/web/lib/connector/hub-client.ts create mode 100644 apps/web/lib/connector/pipeline-handler.ts create mode 100644 apps/web/lib/connector/service.ts create mode 100644 apps/web/lib/connector/transform.ts create mode 100644 apps/web/lib/connector/webhook-listener-store.ts create mode 100644 packages/database/migration/20260203135615_added_connector_model/migration.sql create mode 100644 packages/types/connector.ts diff --git a/.env.example b/.env.example index 99bedcd926..fbe29f4356 100644 --- a/.env.example +++ b/.env.example @@ -225,4 +225,14 @@ REDIS_URL=redis://localhost:6379 # Lingo.dev API key for translation generation -LINGODOTDEV_API_KEY=your_api_key_here \ No newline at end of file +LINGODOTDEV_API_KEY=your_api_key_here + +# ############################################## +# Formbricks Hub API Configuration (for Unify connectors) +# ############################################## + +# The base URL of the Formbricks Hub API +# HUB_API_URL=http://localhost:8080 + +# API key for authenticating with the Hub API +# HUB_API_KEY= \ No newline at end of file diff --git a/apps/web/app/(app)/environments/[environmentId]/workspace/unify/sources/actions.ts b/apps/web/app/(app)/environments/[environmentId]/workspace/unify/sources/actions.ts new file mode 100644 index 0000000000..78dd833486 --- /dev/null +++ b/apps/web/app/(app)/environments/[environmentId]/workspace/unify/sources/actions.ts @@ -0,0 +1,160 @@ +"use server"; + +import { z } from "zod"; +import { ZId } from "@formbricks/types/common"; +import { TSurveyElementTypeEnum } from "@formbricks/types/surveys/constants"; +import { TSurvey } from "@formbricks/types/surveys/types"; +import { getSurveys } from "@/lib/survey/service"; +import { authenticatedActionClient } from "@/lib/utils/action-client"; +import { checkAuthorizationUpdated } from "@/lib/utils/action-client/action-client-middleware"; +import { getOrganizationIdFromEnvironmentId, getProjectIdFromEnvironmentId } from "@/lib/utils/helper"; +import { TUnifySurvey, TUnifySurveyElement } from "./components/types"; + +// Helper to extract elements from a survey (from blocks or legacy questions) +function extractElementsFromSurvey(survey: TSurvey): TUnifySurveyElement[] { + const elements: TUnifySurveyElement[] = []; + + // Try to get elements from blocks first (new structure) + if (survey.blocks && survey.blocks.length > 0) { + for (const block of survey.blocks) { + if (block.elements) { + for (const element of block.elements) { + // Skip non-question elements (like CTA) + if (element.type === TSurveyElementTypeEnum.CTA) continue; + + elements.push({ + id: element.id, + type: element.type, + headline: getElementHeadline(element), + required: element.required ?? false, + }); + } + } + } + } + + // Fallback to legacy questions if no blocks + if (elements.length === 0 && survey.questions && Array.isArray(survey.questions)) { + for (const question of survey.questions as Array<{ + id: string; + type: string; + headline?: string | { default?: string }; + required?: boolean; + }>) { + elements.push({ + id: question.id, + type: question.type, + headline: getQuestionHeadline(question), + required: question.required ?? false, + }); + } + } + + return elements; +} + +// Helper to strip HTML tags from a string +function stripHtmlTags(html: string): string { + // Remove HTML tags and decode common entities + return html + .replace(/<[^>]*>/g, "") // Remove HTML tags + .replace(/ /g, " ") // Replace non-breaking spaces + .replace(/&/g, "&") // Replace ampersands + .replace(/</g, "<") // Replace less than + .replace(/>/g, ">") // Replace greater than + .replace(/"/g, '"') // Replace quotes + .replace(/'/g, "'") // Replace apostrophes + .trim(); +} + +// Helper to get element headline (handles i18n structure) +function getElementHeadline(element: { headline?: string | { default?: string } }): string { + if (!element.headline) return "Untitled"; + let headline: string; + if (typeof element.headline === "string") { + headline = element.headline; + } else if (typeof element.headline === "object" && element.headline.default) { + headline = element.headline.default; + } else { + return "Untitled"; + } + // Strip HTML tags if present + return stripHtmlTags(headline); +} + +// Helper to get question headline (handles i18n structure) +function getQuestionHeadline(question: { headline?: string | { default?: string } }): string { + if (!question.headline) return "Untitled"; + let headline: string; + if (typeof question.headline === "string") { + headline = question.headline; + } else if (typeof question.headline === "object" && question.headline.default) { + headline = question.headline.default; + } else { + return "Untitled"; + } + // Strip HTML tags if present + return stripHtmlTags(headline); +} + +// Map survey status +function mapSurveyStatus(status: string): TUnifySurvey["status"] { + switch (status) { + case "inProgress": + return "active"; + case "paused": + return "paused"; + case "draft": + return "draft"; + case "completed": + return "completed"; + default: + return "draft"; + } +} + +// Transform TSurvey to TUnifySurvey for the UI +function transformToUnifySurvey(survey: TSurvey, responseCount: number): TUnifySurvey { + return { + id: survey.id, + name: survey.name, + status: mapSurveyStatus(survey.status), + responseCount, + elements: extractElementsFromSurvey(survey), + createdAt: survey.createdAt, + }; +} + +// Get surveys for environment action +const ZGetSurveysForUnifyAction = z.object({ + environmentId: ZId, +}); + +export const getSurveysForUnifyAction = authenticatedActionClient + .schema(ZGetSurveysForUnifyAction) + .action(async ({ ctx, parsedInput }): Promise => { + const organizationId = await getOrganizationIdFromEnvironmentId(parsedInput.environmentId); + await checkAuthorizationUpdated({ + userId: ctx.user.id, + organizationId, + access: [ + { + type: "organization", + roles: ["owner", "manager", "member"], + }, + { + type: "projectTeam", + minPermission: "read", + projectId: await getProjectIdFromEnvironmentId(parsedInput.environmentId), + }, + ], + }); + + // Get surveys from the database + const surveys = await getSurveys(parsedInput.environmentId); + + // Transform to TUnifySurvey format + // Note: We don't have response counts readily available, so using 0 for now + // In a production implementation, we'd fetch response counts separately + return surveys.map((survey) => transformToUnifySurvey(survey, 0)); + }); diff --git a/apps/web/app/(app)/environments/[environmentId]/workspace/unify/sources/components/create-source-modal.tsx b/apps/web/app/(app)/environments/[environmentId]/workspace/unify/sources/components/create-source-modal.tsx index 57524544b5..d14170b311 100644 --- a/apps/web/app/(app)/environments/[environmentId]/workspace/unify/sources/components/create-source-modal.tsx +++ b/apps/web/app/(app)/environments/[environmentId]/workspace/unify/sources/components/create-source-modal.tsx @@ -1,7 +1,9 @@ "use client"; -import { PlusIcon, SparklesIcon } from "lucide-react"; -import { useState } from "react"; +import { CheckIcon, CopyIcon, PlusIcon, SparklesIcon, WebhookIcon } from "lucide-react"; +import { nanoid } from "nanoid"; +import { useCallback, useEffect, useRef, useState } from "react"; +import { toast } from "react-hot-toast"; import { Badge } from "@/modules/ui/components/badge"; import { Button } from "@/modules/ui/components/button"; import { @@ -22,7 +24,6 @@ import { AI_SUGGESTED_MAPPINGS, EMAIL_SOURCE_FIELDS, FEEDBACK_RECORD_FIELDS, - MOCK_FORMBRICKS_SURVEYS, SAMPLE_CSV_COLUMNS, SAMPLE_WEBHOOK_PAYLOAD, TCreateSourceStep, @@ -30,14 +31,36 @@ import { TSourceConnection, TSourceField, TSourceType, + TUnifySurvey, parseCSVColumnsToFields, parsePayloadToFields, } from "./types"; +// Polling interval in milliseconds (3 seconds) +const WEBHOOK_POLL_INTERVAL = 3000; + +// Sample webhook payload for cURL example +const SAMPLE_CURL_PAYLOAD = { + timestamp: new Date().toISOString(), + source_type: "webhook", + field_id: "satisfaction_score", + field_type: "rating", + value_number: 4, + user_id: "user_123", + metadata: { + source: "api", + }, +}; + interface CreateSourceModalProps { open: boolean; onOpenChange: (open: boolean) => void; - onCreateSource: (source: TSourceConnection) => void; + onCreateSource: ( + source: TSourceConnection, + selectedSurveyId?: string, + selectedElementIds?: string[] + ) => void; + surveys: TUnifySurvey[]; } function getDefaultSourceName(type: TSourceType): string { @@ -57,7 +80,7 @@ function getDefaultSourceName(type: TSourceType): string { } } -export function CreateSourceModal({ open, onOpenChange, onCreateSource }: CreateSourceModalProps) { +export function CreateSourceModal({ open, onOpenChange, onCreateSource, surveys }: CreateSourceModalProps) { const [currentStep, setCurrentStep] = useState("selectType"); const [selectedType, setSelectedType] = useState(null); const [sourceName, setSourceName] = useState(""); @@ -67,7 +90,92 @@ export function CreateSourceModal({ open, onOpenChange, onCreateSource }: Create // Formbricks-specific state const [selectedSurveyId, setSelectedSurveyId] = useState(null); - const [selectedQuestionIds, setSelectedQuestionIds] = useState([]); + const [selectedElementIds, setSelectedElementIds] = useState([]); + + // Webhook listener state + const [webhookSessionId, setWebhookSessionId] = useState(null); + const [isListening, setIsListening] = useState(false); + const [webhookReceived, setWebhookReceived] = useState(false); + const [copied, setCopied] = useState(false); + const pollingIntervalRef = useRef(null); + + // Generate webhook URL + const webhookUrl = webhookSessionId + ? `${typeof window !== "undefined" ? window.location.origin : ""}/api/unify/webhook-listener/${webhookSessionId}` + : ""; + + // Poll for webhook payload + const pollForWebhook = useCallback(async () => { + if (!webhookSessionId) return; + + try { + const response = await fetch(`/api/unify/webhook-listener/${webhookSessionId}`); + + if (response.status === 200) { + const data = await response.json(); + if (data.payload) { + // Parse the received payload into source fields + const fields = parsePayloadToFields(data.payload); + setSourceFields(fields); + setWebhookReceived(true); + setIsListening(false); + toast.success("Webhook received! Fields loaded."); + + // Stop polling + if (pollingIntervalRef.current) { + clearInterval(pollingIntervalRef.current); + pollingIntervalRef.current = null; + } + } + } + // 204 means no payload yet, keep polling + } catch (error) { + console.error("Error polling for webhook:", error); + } + }, [webhookSessionId]); + + // Start/stop polling based on listening state + useEffect(() => { + if (isListening && webhookSessionId) { + // Start polling + pollingIntervalRef.current = setInterval(pollForWebhook, WEBHOOK_POLL_INTERVAL); + // Also poll immediately + pollForWebhook(); + } + + return () => { + // Cleanup polling on unmount or when listening stops + if (pollingIntervalRef.current) { + clearInterval(pollingIntervalRef.current); + pollingIntervalRef.current = null; + } + }; + }, [isListening, webhookSessionId, pollForWebhook]); + + // Generate session ID when webhook type is selected and modal opens + useEffect(() => { + if (open && selectedType === "webhook" && currentStep === "mapping" && !webhookSessionId) { + setWebhookSessionId(nanoid(21)); + setIsListening(true); + } + }, [open, selectedType, currentStep, webhookSessionId]); + + // Copy cURL command to clipboard + const handleCopyWebhookUrl = async () => { + if (!webhookUrl) return; + const curlCommand = `curl -X POST \\ + "${webhookUrl}" \\ + -H "Content-Type: application/json" \\ + -d '${JSON.stringify(SAMPLE_CURL_PAYLOAD, null, 2)}'`; + try { + await navigator.clipboard.writeText(curlCommand); + setCopied(true); + toast.success("cURL command copied to clipboard"); + setTimeout(() => setCopied(false), 2000); + } catch { + toast.error("Failed to copy"); + } + }; const resetForm = () => { setCurrentStep("selectType"); @@ -77,7 +185,16 @@ export function CreateSourceModal({ open, onOpenChange, onCreateSource }: Create setSourceFields([]); setDeriveFromAttachments(false); setSelectedSurveyId(null); - setSelectedQuestionIds([]); + setSelectedElementIds([]); + // Reset webhook state + setWebhookSessionId(null); + setIsListening(false); + setWebhookReceived(false); + setCopied(false); + if (pollingIntervalRef.current) { + clearInterval(pollingIntervalRef.current); + pollingIntervalRef.current = null; + } }; const handleOpenChange = (newOpen: boolean) => { @@ -91,7 +208,7 @@ export function CreateSourceModal({ open, onOpenChange, onCreateSource }: Create if (currentStep === "selectType" && selectedType && selectedType !== "slack") { if (selectedType === "formbricks") { // For Formbricks, use the survey name if selected - const selectedSurvey = MOCK_FORMBRICKS_SURVEYS.find((s) => s.id === selectedSurveyId); + const selectedSurvey = surveys.find((s) => s.id === selectedSurveyId); setSourceName( selectedSurvey ? `${selectedSurvey.name} Connection` : getDefaultSourceName(selectedType) ); @@ -107,21 +224,21 @@ export function CreateSourceModal({ open, onOpenChange, onCreateSource }: Create setSelectedSurveyId(surveyId); }; - const handleQuestionToggle = (questionId: string) => { - setSelectedQuestionIds((prev) => - prev.includes(questionId) ? prev.filter((id) => id !== questionId) : [...prev, questionId] + const handleElementToggle = (elementId: string) => { + setSelectedElementIds((prev) => + prev.includes(elementId) ? prev.filter((id) => id !== elementId) : [...prev, elementId] ); }; - const handleSelectAllQuestions = (surveyId: string) => { - const survey = MOCK_FORMBRICKS_SURVEYS.find((s) => s.id === surveyId); + const handleSelectAllElements = (surveyId: string) => { + const survey = surveys.find((s) => s.id === surveyId); if (survey) { - setSelectedQuestionIds(survey.questions.map((q) => q.id)); + setSelectedElementIds(survey.elements.map((e) => e.id)); } }; - const handleDeselectAllQuestions = () => { - setSelectedQuestionIds([]); + const handleDeselectAllElements = () => { + setSelectedElementIds([]); }; const handleBack = () => { @@ -135,15 +252,17 @@ export function CreateSourceModal({ open, onOpenChange, onCreateSource }: Create const handleCreateSource = () => { if (!selectedType || !sourceName.trim()) return; - // Check if all required fields are mapped - const requiredFields = FEEDBACK_RECORD_FIELDS.filter((f) => f.required); - const allRequiredMapped = requiredFields.every((field) => - mappings.some((m) => m.targetFieldId === field.id) - ); + // Check if all required fields are mapped (for non-Formbricks connectors) + if (selectedType !== "formbricks") { + const requiredFields = FEEDBACK_RECORD_FIELDS.filter((f) => f.required); + const allRequiredMapped = requiredFields.every((field) => + mappings.some((m) => m.targetFieldId === field.id) + ); - if (!allRequiredMapped) { - // For now, we'll allow creating without all required fields for POC - console.warn("Not all required fields are mapped"); + if (!allRequiredMapped) { + // For now, we'll allow creating without all required fields for POC + console.warn("Not all required fields are mapped"); + } } const newSource: TSourceConnection = { @@ -155,7 +274,12 @@ export function CreateSourceModal({ open, onOpenChange, onCreateSource }: Create updatedAt: new Date(), }; - onCreateSource(newSource); + // Pass the Formbricks-specific data if applicable + onCreateSource( + newSource, + selectedType === "formbricks" ? (selectedSurveyId ?? undefined) : undefined, + selectedType === "formbricks" ? selectedElementIds : undefined + ); resetForm(); onOpenChange(false); }; @@ -165,9 +289,9 @@ export function CreateSourceModal({ open, onOpenChange, onCreateSource }: Create mappings.some((m) => m.targetFieldId === field.id && (m.sourceFieldId || m.staticValue)) ); - // Formbricks validation - need survey and at least one question selected + // Formbricks validation - need survey and at least one element selected const isFormbricksValid = - selectedType === "formbricks" && selectedSurveyId && selectedQuestionIds.length > 0; + selectedType === "formbricks" && selectedSurveyId && selectedElementIds.length > 0; // CSV validation - need sourceFields loaded (CSV uploaded or sample loaded) const isCsvValid = selectedType === "csv" && sourceFields.length > 0; @@ -285,12 +409,13 @@ export function CreateSourceModal({ open, onOpenChange, onCreateSource }: Create
@@ -330,32 +455,128 @@ export function CreateSourceModal({ open, onOpenChange, onCreateSource }: Create /> - {/* Action buttons above scrollable area */} -
-
- - {sourceFields.length > 0 && ( - - )} -
-
+ {/* Webhook Listener UI */} + {selectedType === "webhook" && !webhookReceived && ( +
+ {/* Centered waiting indicator */} +
+ + + + + + +

Waiting for webhook...

+

Send a request to the URL below

+
-
- -
+ {/* cURL example at bottom */} +
+ +
+
+                          {`curl -X POST "${webhookUrl || "..."}" \\
+  -H "Content-Type: application/json" \\
+  -d '${JSON.stringify(SAMPLE_CURL_PAYLOAD, null, 2)}'`}
+                        
+ +
+
+
+ )} + + {/* Webhook received - show success + mapping UI */} + {selectedType === "webhook" && webhookReceived && ( +
+ {/* Success indicator */} +
+
+ +
+

Webhook received!

+

+ {sourceFields.length} fields detected. Map them below. +

+
+ + {/* AI suggest mapping button */} + {sourceFields.length > 0 && ( +
+ +
+ )} + + {/* Mapping UI */} +
+ +
+
+ )} + + {/* Non-webhook types */} + {selectedType !== "webhook" && ( + <> + {/* Action buttons */} +
+
+ + {sourceFields.length > 0 && ( + + )} +
+
+ + {/* Mapping UI */} +
+ +
+ + )} )} diff --git a/apps/web/app/(app)/environments/[environmentId]/workspace/unify/sources/components/edit-source-modal.tsx b/apps/web/app/(app)/environments/[environmentId]/workspace/unify/sources/components/edit-source-modal.tsx index df9df8846a..095491f5b5 100644 --- a/apps/web/app/(app)/environments/[environmentId]/workspace/unify/sources/components/edit-source-modal.tsx +++ b/apps/web/app/(app)/environments/[environmentId]/workspace/unify/sources/components/edit-source-modal.tsx @@ -1,6 +1,8 @@ "use client"; import { + CheckIcon, + CopyIcon, FileSpreadsheetIcon, GlobeIcon, MailIcon, @@ -8,7 +10,8 @@ import { SparklesIcon, WebhookIcon, } from "lucide-react"; -import { useEffect, useState } from "react"; +import { useCallback, useEffect, useRef, useState } from "react"; +import { toast } from "react-hot-toast"; import { Badge } from "@/modules/ui/components/badge"; import { Button } from "@/modules/ui/components/button"; import { @@ -21,6 +24,7 @@ import { } from "@/modules/ui/components/dialog"; import { Input } from "@/modules/ui/components/input"; import { Label } from "@/modules/ui/components/label"; +import { FormbricksSurveySelector } from "./formbricks-survey-selector"; import { MappingUI } from "./mapping-ui"; import { AI_SUGGESTED_MAPPINGS, @@ -32,16 +36,41 @@ import { TSourceConnection, TSourceField, TSourceType, + TUnifySurvey, parseCSVColumnsToFields, parsePayloadToFields, } from "./types"; +// Polling interval in milliseconds (3 seconds) +const WEBHOOK_POLL_INTERVAL = 3000; + +// Sample webhook payload for cURL example +const SAMPLE_CURL_PAYLOAD = { + timestamp: new Date().toISOString(), + source_type: "webhook", + field_id: "satisfaction_score", + field_type: "rating", + value_number: 4, + user_id: "user_123", + metadata: { + source: "api", + }, +}; + interface EditSourceModalProps { source: TSourceConnection | null; open: boolean; onOpenChange: (open: boolean) => void; - onUpdateSource: (source: TSourceConnection) => void; + onUpdateSource: ( + source: TSourceConnection, + selectedSurveyId?: string, + selectedElementIds?: string[] + ) => void; onDeleteSource: (sourceId: string) => void; + surveys: TUnifySurvey[]; + // For Formbricks connectors - the currently selected survey/elements + initialSurveyId?: string | null; + initialElementIds?: string[]; } function getSourceIcon(type: TSourceType) { @@ -97,6 +126,9 @@ export function EditSourceModal({ onOpenChange, onUpdateSource, onDeleteSource, + surveys, + initialSurveyId, + initialElementIds = [], }: EditSourceModalProps) { const [sourceName, setSourceName] = useState(""); const [mappings, setMappings] = useState([]); @@ -104,14 +136,120 @@ export function EditSourceModal({ const [showDeleteConfirm, setShowDeleteConfirm] = useState(false); const [deriveFromAttachments, setDeriveFromAttachments] = useState(false); + // Formbricks-specific state + const [selectedSurveyId, setSelectedSurveyId] = useState(null); + const [selectedElementIds, setSelectedElementIds] = useState([]); + + // Webhook listener state + const [isListening, setIsListening] = useState(false); + const [webhookReceived, setWebhookReceived] = useState(false); + const [copied, setCopied] = useState(false); + const pollingIntervalRef = useRef(null); + + // Permanent webhook URL using connector ID + const webhookUrl = source + ? `${typeof window !== "undefined" ? window.location.origin : ""}/api/unify/webhook/${source.id}` + : ""; + + // Poll for webhook payload using connector ID + const pollForWebhook = useCallback(async () => { + if (!source?.id) return; + + try { + const response = await fetch(`/api/unify/webhook/${source.id}`); + + if (response.status === 200) { + const data = await response.json(); + if (data.payload) { + // Parse the received payload into source fields + const fields = parsePayloadToFields(data.payload); + setSourceFields(fields); + setWebhookReceived(true); + setIsListening(false); + toast.success("Webhook received! Fields loaded."); + + // Stop polling + if (pollingIntervalRef.current) { + clearInterval(pollingIntervalRef.current); + pollingIntervalRef.current = null; + } + } + } + // 204 means no payload yet, keep polling + } catch (error) { + console.error("Error polling for webhook:", error); + } + }, [source?.id]); + + // Start/stop polling based on listening state + useEffect(() => { + if (isListening && source?.id) { + // Start polling + pollingIntervalRef.current = setInterval(pollForWebhook, WEBHOOK_POLL_INTERVAL); + // Also poll immediately + pollForWebhook(); + } + + return () => { + // Cleanup polling on unmount or when listening stops + if (pollingIntervalRef.current) { + clearInterval(pollingIntervalRef.current); + pollingIntervalRef.current = null; + } + }; + }, [isListening, source?.id, pollForWebhook]); + + // Copy webhook URL to clipboard + const handleCopyWebhookUrl = async () => { + if (!webhookUrl) return; + try { + await navigator.clipboard.writeText(webhookUrl); + setCopied(true); + toast.success("Webhook URL copied to clipboard"); + setTimeout(() => setCopied(false), 2000); + } catch { + toast.error("Failed to copy"); + } + }; + useEffect(() => { if (source) { setSourceName(source.name); setMappings(source.mappings); - setSourceFields(getInitialSourceFields(source.type)); setDeriveFromAttachments(false); + + // For Formbricks connectors, set the initial survey/element selection + if (source.type === "formbricks") { + setSelectedSurveyId(initialSurveyId ?? null); + setSelectedElementIds(initialElementIds); + setSourceFields(getInitialSourceFields(source.type)); + } else if (source.type === "webhook") { + // Webhook: if we already have mappings, show them; otherwise show listening state + if (source.mappings.length > 0) { + // Build source fields from existing mapping source IDs so the mapping UI can display them + const sourceFieldIds = new Set(); + for (const m of source.mappings) { + if (m.sourceFieldId) sourceFieldIds.add(m.sourceFieldId); + } + const fieldsFromMappings = Array.from(sourceFieldIds).map((id) => ({ + id, + name: id, + type: "string", + sampleValue: "", + })); + setSourceFields(fieldsFromMappings); + setWebhookReceived(true); + setIsListening(false); + } else { + setSourceFields([]); + setIsListening(true); + setWebhookReceived(false); + } + } else { + setSourceFields(getInitialSourceFields(source.type)); + } } - }, [source]); + }, [source, initialSurveyId, initialElementIds]); const resetForm = () => { setSourceName(""); @@ -119,6 +257,16 @@ export function EditSourceModal({ setSourceFields([]); setShowDeleteConfirm(false); setDeriveFromAttachments(false); + setSelectedSurveyId(null); + setSelectedElementIds([]); + // Reset webhook state + setIsListening(false); + setWebhookReceived(false); + setCopied(false); + if (pollingIntervalRef.current) { + clearInterval(pollingIntervalRef.current); + pollingIntervalRef.current = null; + } }; const handleOpenChange = (newOpen: boolean) => { @@ -128,6 +276,28 @@ export function EditSourceModal({ onOpenChange(newOpen); }; + // Formbricks handlers + const handleSurveySelect = (surveyId: string | null) => { + setSelectedSurveyId(surveyId); + }; + + const handleElementToggle = (elementId: string) => { + setSelectedElementIds((prev) => + prev.includes(elementId) ? prev.filter((id) => id !== elementId) : [...prev, elementId] + ); + }; + + const handleSelectAllElements = (surveyId: string) => { + const survey = surveys.find((s) => s.id === surveyId); + if (survey) { + setSelectedElementIds(survey.elements.map((e) => e.id)); + } + }; + + const handleDeselectAllElements = () => { + setSelectedElementIds([]); + }; + const handleUpdateSource = () => { if (!source || !sourceName.trim()) return; @@ -138,7 +308,12 @@ export function EditSourceModal({ updatedAt: new Date(), }; - onUpdateSource(updatedSource); + // For Formbricks, pass the survey/element selection + if (source.type === "formbricks") { + onUpdateSource(updatedSource, selectedSurveyId ?? undefined, selectedElementIds); + } else { + onUpdateSource(updatedSource); + } handleOpenChange(false); }; @@ -242,33 +417,157 @@ export function EditSourceModal({ /> - {/* Action buttons above scrollable area */} -
-
- - {sourceFields.length > 0 && ( - - )} + {source.type === "formbricks" ? ( + /* Formbricks Survey Selector UI */ +
+
-
+ ) : ( + /* Other source types - Mapping UI */ + <> + {/* Webhook Listener UI - Waiting state */} + {source.type === "webhook" && !webhookReceived && ( +
+ {/* Permanent Webhook URL */} +
+
+
+ +
+
+

Your Webhook URL

+

+ This is your permanent webhook endpoint. Use it in your integrations. +

+
+ + {webhookUrl || "Loading..."} + + +
+
+
+
- {/* Mapping UI */} -
- -
+ {/* Centered waiting indicator */} +
+ + + + + + +

Listening for test payload...

+

Send a request to update field mappings

+
+ + {/* cURL example */} +
+ +
+
+                        {`curl -X POST "${webhookUrl || "..."}" \\
+  -H "Content-Type: application/json" \\
+  -d '${JSON.stringify(SAMPLE_CURL_PAYLOAD, null, 2)}'`}
+                      
+
+
+
+ )} + + {/* Webhook configured - show mapping UI */} + {source.type === "webhook" && webhookReceived && ( +
+ {/* Webhook URL + copy (when already configured) */} +
+ Webhook URL: + + {webhookUrl || "..."} + + +
+ + {/* AI suggest mapping button */} + {sourceFields.length > 0 && ( +
+ +
+ )} + + {/* Mapping UI */} +
+ +
+
+ )} + + {/* Non-webhook types */} + {source.type !== "webhook" && ( + <> + {/* Action buttons */} +
+
+ + {sourceFields.length > 0 && ( + + )} +
+
+ + {/* Mapping UI */} +
+ +
+ + )} + + )}
@@ -289,7 +588,12 @@ export function EditSourceModal({ )} - diff --git a/apps/web/app/(app)/environments/[environmentId]/workspace/unify/sources/components/formbricks-survey-selector.tsx b/apps/web/app/(app)/environments/[environmentId]/workspace/unify/sources/components/formbricks-survey-selector.tsx index e1811127c5..f0666a1605 100644 --- a/apps/web/app/(app)/environments/[environmentId]/workspace/unify/sources/components/formbricks-survey-selector.tsx +++ b/apps/web/app/(app)/environments/[environmentId]/workspace/unify/sources/components/formbricks-survey-selector.tsx @@ -12,36 +12,31 @@ import { } from "lucide-react"; import { useState } from "react"; import { Badge } from "@/modules/ui/components/badge"; -import { - MOCK_FORMBRICKS_SURVEYS, - TFormbricksSurvey, - TFormbricksSurveyQuestion, - getQuestionTypeLabel, -} from "./types"; +import { TUnifySurvey, getElementTypeLabel } from "./types"; interface FormbricksSurveySelectorProps { + surveys: TUnifySurvey[]; selectedSurveyId: string | null; - selectedQuestionIds: string[]; + selectedElementIds: string[]; onSurveySelect: (surveyId: string | null) => void; - onQuestionToggle: (questionId: string) => void; - onSelectAllQuestions: (surveyId: string) => void; - onDeselectAllQuestions: () => void; + onElementToggle: (elementId: string) => void; + onSelectAllElements: (surveyId: string) => void; + onDeselectAllElements: () => void; } -function getQuestionIcon(type: TFormbricksSurveyQuestion["type"]) { +function getElementIcon(type: string) { switch (type) { case "openText": return ; case "rating": case "nps": - case "csat": return ; default: return ; } } -function getStatusBadge(status: TFormbricksSurvey["status"]) { +function getStatusBadge(status: TUnifySurvey["status"]) { switch (status) { case "active": return ; @@ -57,31 +52,31 @@ function getStatusBadge(status: TFormbricksSurvey["status"]) { } export function FormbricksSurveySelector({ + surveys, selectedSurveyId, - selectedQuestionIds, + selectedElementIds, onSurveySelect, - onQuestionToggle, - onSelectAllQuestions, - onDeselectAllQuestions, + onElementToggle, + onSelectAllElements, + onDeselectAllElements, }: FormbricksSurveySelectorProps) { const [expandedSurveyId, setExpandedSurveyId] = useState(null); - const selectedSurvey = MOCK_FORMBRICKS_SURVEYS.find((s) => s.id === selectedSurveyId); + const selectedSurvey = surveys.find((s) => s.id === selectedSurveyId); - const handleSurveyClick = (survey: TFormbricksSurvey) => { + const handleSurveyClick = (survey: TUnifySurvey) => { if (selectedSurveyId === survey.id) { // Toggle expand/collapse if already selected setExpandedSurveyId(expandedSurveyId === survey.id ? null : survey.id); } else { // Select the survey and expand it onSurveySelect(survey.id); - onDeselectAllQuestions(); + onDeselectAllElements(); setExpandedSurveyId(survey.id); } }; - const allQuestionsSelected = - selectedSurvey && selectedQuestionIds.length === selectedSurvey.questions.length; + const allElementsSelected = selectedSurvey && selectedElementIds.length === selectedSurvey.elements.length; return (
@@ -89,74 +84,85 @@ export function FormbricksSurveySelector({

Select Survey

- {MOCK_FORMBRICKS_SURVEYS.map((survey) => { - const isSelected = selectedSurveyId === survey.id; - const isExpanded = expandedSurveyId === survey.id; + {surveys.length === 0 ? ( +
+

No surveys found in this environment

+
+ ) : ( + surveys.map((survey) => { + const isSelected = selectedSurveyId === survey.id; + const isExpanded = expandedSurveyId === survey.id; - return ( -
-
- {isSelected && } - -
- ); - })} +
+
+ {survey.name} + {getStatusBadge(survey.status)} +
+

+ {survey.elements.length} elements + {survey.responseCount > 0 && ` · ${survey.responseCount.toLocaleString()} responses`} +

+
+ {isSelected && } + +
+ ); + }) + )}
- {/* Right: Question Selection */} + {/* Right: Element Selection */}
-

Select Questions

+

Select Elements

{selectedSurvey && ( )}
{!selectedSurvey ? (
-

Select a survey to see its questions

+

Select a survey to see its elements

+
+ ) : selectedSurvey.elements.length === 0 ? ( +
+

This survey has no question elements

) : (
- {selectedSurvey.questions.map((question) => { - const isSelected = selectedQuestionIds.includes(question.id); + {selectedSurvey.elements.map((element) => { + const isSelected = selectedElementIds.includes(element.id); return (
-
{getQuestionIcon(question.type)}
+
{getElementIcon(element.type)}
-

{question.headline}

+

{element.headline}

- {getQuestionTypeLabel(question.type)} - {question.required && ( + {getElementTypeLabel(element.type)} + {element.required && ( Required @@ -184,12 +190,12 @@ export function FormbricksSurveySelector({ ); })} - {selectedQuestionIds.length > 0 && ( + {selectedElementIds.length > 0 && (

- {selectedQuestionIds.length} question - {selectedQuestionIds.length !== 1 ? "s" : ""} selected. Each response to these questions - will create a FeedbackRecord in the Hub. + {selectedElementIds.length} element + {selectedElementIds.length !== 1 ? "s" : ""} selected. Each response to these elements will + create a FeedbackRecord in the Hub.

)} diff --git a/apps/web/app/(app)/environments/[environmentId]/workspace/unify/sources/components/sources-page-client.tsx b/apps/web/app/(app)/environments/[environmentId]/workspace/unify/sources/components/sources-page-client.tsx index 4bfb74223b..fe5b1d6cde 100644 --- a/apps/web/app/(app)/environments/[environmentId]/workspace/unify/sources/components/sources-page-client.tsx +++ b/apps/web/app/(app)/environments/[environmentId]/workspace/unify/sources/components/sources-page-client.tsx @@ -1,33 +1,280 @@ "use client"; -import { useState } from "react"; +import { useCallback, useEffect, useState } from "react"; +import { toast } from "react-hot-toast"; +import { TConnectorWithMappings, TFormbricksConnector } from "@formbricks/types/connector"; +import { + createConnectorAction, + deleteConnectorAction, + getConnectorsWithMappingsAction, + syncFieldMappingsAction, + syncFormbricksMappingsAction, + updateConnectorAction, +} from "@/lib/connector/actions"; import { PageContentWrapper } from "@/modules/ui/components/page-content-wrapper"; import { PageHeader } from "@/modules/ui/components/page-header"; import { UnifyConfigNavigation } from "../../components/UnifyConfigNavigation"; +import { getSurveysForUnifyAction } from "../actions"; import { CreateSourceModal } from "./create-source-modal"; import { EditSourceModal } from "./edit-source-modal"; import { SourcesTable } from "./sources-table"; -import { TSourceConnection } from "./types"; +import { TSourceConnection, TUnifySurvey } from "./types"; +import { elementTypeToHubFieldType } from "./types"; interface SourcesSectionProps { environmentId: string; } +// Transform connector from database to TSourceConnection for UI +function connectorToSourceConnection(connector: TConnectorWithMappings): TSourceConnection { + // For webhook (and other field-mapping connectors), include field mappings + const mappings = + connector.type === "webhook" && "fieldMappings" in connector && connector.fieldMappings?.length + ? connector.fieldMappings.map((m) => ({ + sourceFieldId: m.sourceFieldId, + targetFieldId: m.targetFieldId, + staticValue: m.staticValue ?? undefined, + })) + : []; + + return { + id: connector.id, + name: connector.name, + type: connector.type as TSourceConnection["type"], + mappings, + createdAt: connector.createdAt, + updatedAt: connector.updatedAt, + }; +} + +// Get Formbricks mapping data from a connector +function getFormbricksMappingData(connector: TConnectorWithMappings): { + surveyId: string | null; + elementIds: string[]; +} { + if (connector.type !== "formbricks" || !("formbricksMappings" in connector)) { + return { surveyId: null, elementIds: [] }; + } + + const formbricksConnector = connector as TFormbricksConnector; + const mappings = formbricksConnector.formbricksMappings || []; + + if (mappings.length === 0) { + return { surveyId: null, elementIds: [] }; + } + + // All mappings for a Formbricks connector should be for the same survey (for now) + const surveyId = mappings[0].surveyId; + const elementIds = mappings.map((m) => m.elementId); + + return { surveyId, elementIds }; +} + export function SourcesSection({ environmentId }: SourcesSectionProps) { const [sources, setSources] = useState([]); + const [connectorsMap, setConnectorsMap] = useState>(new Map()); + const [surveys, setSurveys] = useState([]); const [isCreateModalOpen, setIsCreateModalOpen] = useState(false); const [editingSource, setEditingSource] = useState(null); + const [isLoading, setIsLoading] = useState(true); - const handleCreateSource = (source: TSourceConnection) => { - setSources((prev) => [...prev, source]); + // Fetch surveys and connectors on mount + const fetchData = useCallback(async () => { + setIsLoading(true); + try { + // Fetch surveys and connectors in parallel + const [surveysResult, connectorsResult] = await Promise.all([ + getSurveysForUnifyAction({ environmentId }), + getConnectorsWithMappingsAction({ environmentId }), + ]); + + if (surveysResult?.data) { + setSurveys(surveysResult.data); + } + + if (connectorsResult?.data) { + setSources(connectorsResult.data.map(connectorToSourceConnection)); + // Store the full connector data for editing + const newConnectorsMap = new Map(); + connectorsResult.data.forEach((connector) => { + newConnectorsMap.set(connector.id, connector); + }); + setConnectorsMap(newConnectorsMap); + } + } catch (error) { + console.error("Failed to fetch data:", error); + toast.error("Failed to load data"); + } finally { + setIsLoading(false); + } + }, [environmentId]); + + useEffect(() => { + fetchData(); + }, [fetchData]); + + const handleCreateSource = async ( + source: TSourceConnection, + selectedSurveyId?: string, + selectedElementIds?: string[] + ) => { + try { + // Create the connector in the database + const result = await createConnectorAction({ + environmentId, + connectorInput: { + name: source.name, + type: source.type, + }, + }); + + if (!result?.data) { + toast.error("Failed to create connector"); + return; + } + + const connectorResult = result.data; + if ("error" in connectorResult && connectorResult.error) { + toast.error(connectorResult.error.message || "Failed to create connector"); + return; + } + + const connector = "data" in connectorResult ? connectorResult.data : connectorResult; + if (!connector || !connector.id) { + toast.error("Failed to create connector - invalid response"); + return; + } + + // If it's a Formbricks connector, create the mappings + if ( + source.type === "formbricks" && + selectedSurveyId && + selectedElementIds && + selectedElementIds.length > 0 + ) { + // Get the survey to determine element types + const selectedSurvey = surveys.find((s) => s.id === selectedSurveyId); + if (selectedSurvey) { + const mappings = selectedElementIds.map((elementId) => { + const element = selectedSurvey.elements.find((e) => e.id === elementId); + return { + surveyId: selectedSurveyId, + elementId, + hubFieldType: elementTypeToHubFieldType(element?.type || "openText"), + }; + }); + + await syncFormbricksMappingsAction({ + connectorId: connector.id, + mappings, + }); + } + } else if (source.type !== "formbricks" && source.mappings.length > 0) { + // For other connector types, save field mappings + const fieldMappings = source.mappings.map((m) => ({ + sourceFieldId: m.sourceFieldId || "", + targetFieldId: m.targetFieldId, + staticValue: m.staticValue, + })); + + await syncFieldMappingsAction({ + connectorId: connector.id, + mappings: fieldMappings, + }); + } + + // Refresh the list + await fetchData(); + toast.success("Connector created successfully"); + } catch (error) { + console.error("Failed to create connector:", error); + toast.error("Failed to create connector"); + } }; - const handleUpdateSource = (updatedSource: TSourceConnection) => { - setSources((prev) => prev.map((s) => (s.id === updatedSource.id ? updatedSource : s))); + const handleUpdateSource = async ( + updatedSource: TSourceConnection, + selectedSurveyId?: string, + selectedElementIds?: string[] + ) => { + try { + // Update the connector name + const result = await updateConnectorAction({ + connectorId: updatedSource.id, + connectorInput: { + name: updatedSource.name, + }, + }); + + if (!result?.data) { + toast.error("Failed to update connector"); + return; + } + + // If it's a Formbricks connector, update the mappings + if ( + updatedSource.type === "formbricks" && + selectedSurveyId && + selectedElementIds && + selectedElementIds.length > 0 + ) { + const selectedSurvey = surveys.find((s) => s.id === selectedSurveyId); + if (selectedSurvey) { + const mappings = selectedElementIds.map((elementId) => { + const element = selectedSurvey.elements.find((e) => e.id === elementId); + return { + surveyId: selectedSurveyId, + elementId, + hubFieldType: elementTypeToHubFieldType(element?.type || "openText"), + }; + }); + + await syncFormbricksMappingsAction({ + connectorId: updatedSource.id, + mappings, + }); + } + } else if (updatedSource.type !== "formbricks" && updatedSource.mappings.length > 0) { + // For other connector types, save field mappings + const fieldMappings = updatedSource.mappings.map((m) => ({ + sourceFieldId: m.sourceFieldId || "", + targetFieldId: m.targetFieldId, + staticValue: m.staticValue, + })); + + await syncFieldMappingsAction({ + connectorId: updatedSource.id, + mappings: fieldMappings, + }); + } + + // Refresh the list + await fetchData(); + toast.success("Connector updated successfully"); + } catch (error) { + console.error("Failed to update connector:", error); + toast.error("Failed to update connector"); + } }; - const handleDeleteSource = (sourceId: string) => { - setSources((prev) => prev.filter((s) => s.id !== sourceId)); + const handleDeleteSource = async (sourceId: string) => { + try { + const result = await deleteConnectorAction({ + connectorId: sourceId, + }); + + if (!result?.data) { + toast.error("Failed to delete connector"); + return; + } + + // Refresh the list + await fetchData(); + toast.success("Connector deleted successfully"); + } catch (error) { + console.error("Failed to delete connector:", error); + toast.error("Failed to delete connector"); + } }; const handleSourceClick = (source: TSourceConnection) => { @@ -43,13 +290,14 @@ export function SourcesSection({ environmentId }: SourcesSectionProps) { open={isCreateModalOpen} onOpenChange={setIsCreateModalOpen} onCreateSource={handleCreateSource} + surveys={surveys} /> }>
- +
!open && setEditingSource(null)} onUpdateSource={handleUpdateSource} onDeleteSource={handleDeleteSource} + surveys={surveys} + initialSurveyId={ + editingSource && connectorsMap.get(editingSource.id) + ? getFormbricksMappingData(connectorsMap.get(editingSource.id)!).surveyId + : null + } + initialElementIds={ + editingSource && connectorsMap.get(editingSource.id) + ? getFormbricksMappingData(connectorsMap.get(editingSource.id)!).elementIds + : [] + } /> ); diff --git a/apps/web/app/(app)/environments/[environmentId]/workspace/unify/sources/components/sources-table.tsx b/apps/web/app/(app)/environments/[environmentId]/workspace/unify/sources/components/sources-table.tsx index ab9f12c870..1bd5d752a7 100644 --- a/apps/web/app/(app)/environments/[environmentId]/workspace/unify/sources/components/sources-table.tsx +++ b/apps/web/app/(app)/environments/[environmentId]/workspace/unify/sources/components/sources-table.tsx @@ -1,14 +1,16 @@ "use client"; +import { Loader2Icon } from "lucide-react"; import { SourcesTableDataRow } from "./sources-table-data-row"; import { TSourceConnection } from "./types"; interface SourcesTableProps { sources: TSourceConnection[]; onSourceClick: (source: TSourceConnection) => void; + isLoading?: boolean; } -export function SourcesTable({ sources, onSourceClick }: SourcesTableProps) { +export function SourcesTable({ sources, onSourceClick, isLoading = false }: SourcesTableProps) { return (
@@ -17,7 +19,11 @@ export function SourcesTable({ sources, onSourceClick }: SourcesTableProps) {
Mappings
Created
- {sources.length === 0 ? ( + {isLoading ? ( +
+ +
+ ) : sources.length === 0 ? (

No sources connected yet. Add a source to get started.

diff --git a/apps/web/app/(app)/environments/[environmentId]/workspace/unify/sources/components/types.ts b/apps/web/app/(app)/environments/[environmentId]/workspace/unify/sources/components/types.ts index 7f86d09ba5..a0377c256c 100644 --- a/apps/web/app/(app)/environments/[environmentId]/workspace/unify/sources/components/types.ts +++ b/apps/web/app/(app)/environments/[environmentId]/workspace/unify/sources/components/types.ts @@ -49,7 +49,7 @@ export const SOURCE_OPTIONS: TSourceOption[] = [ }, ]; -// Formbricks Survey types for survey selection +// Formbricks Survey types for survey selection (legacy - kept for backwards compatibility) export interface TFormbricksSurveyQuestion { id: string; type: "openText" | "rating" | "nps" | "csat" | "multipleChoice" | "checkbox" | "date"; @@ -66,6 +66,23 @@ export interface TFormbricksSurvey { createdAt: Date; } +// New Unify Survey types that work with real survey data +export interface TUnifySurveyElement { + id: string; + type: string; // Element type from TSurveyElementTypeEnum + headline: string; + required: boolean; +} + +export interface TUnifySurvey { + id: string; + name: string; + status: "draft" | "active" | "paused" | "completed"; + responseCount: number; + elements: TUnifySurveyElement[]; + createdAt: Date; +} + // Mock surveys for POC export const MOCK_FORMBRICKS_SURVEYS: TFormbricksSurvey[] = [ { @@ -170,7 +187,7 @@ export const MOCK_FORMBRICKS_SURVEYS: TFormbricksSurvey[] = [ }, ]; -// Helper to get question type label +// Helper to get question type label (legacy) export function getQuestionTypeLabel(type: TFormbricksSurveyQuestion["type"]): string { switch (type) { case "openText": @@ -192,7 +209,45 @@ export function getQuestionTypeLabel(type: TFormbricksSurveyQuestion["type"]): s } } -// Helper to map question type to FeedbackRecord field_type +// Helper to get element type label (for real survey elements) +export function getElementTypeLabel(type: string): string { + switch (type) { + case "openText": + return "Open Text"; + case "rating": + return "Rating"; + case "nps": + return "NPS"; + case "multipleChoiceSingle": + return "Single Choice"; + case "multipleChoiceMulti": + return "Multiple Choice"; + case "date": + return "Date"; + case "consent": + return "Consent"; + case "matrix": + return "Matrix"; + case "ranking": + return "Ranking"; + case "pictureSelection": + return "Picture Selection"; + case "contactInfo": + return "Contact Info"; + case "address": + return "Address"; + case "fileUpload": + return "File Upload"; + case "cal": + return "Calendar"; + case "cta": + return "CTA"; + default: + return type; + } +} + +// Helper to map question type to FeedbackRecord field_type (legacy) export function questionTypeToFieldType(type: TFormbricksSurveyQuestion["type"]): TFeedbackRecordFieldType { switch (type) { case "openText": @@ -213,6 +268,38 @@ export function questionTypeToFieldType(type: TFormbricksSurveyQuestion["type"]) } } +// Helper to map element type to Hub field_type (for real survey elements) +export function elementTypeToHubFieldType(type: string): TFeedbackRecordFieldType { + switch (type) { + case "openText": + return "text"; + case "rating": + return "rating"; + case "nps": + return "nps"; + case "multipleChoiceSingle": + case "multipleChoiceMulti": + return "categorical"; + case "date": + return "date"; + case "consent": + return "boolean"; + case "matrix": + case "ranking": + case "pictureSelection": + return "categorical"; + case "contactInfo": + case "address": + case "fileUpload": + case "cal": + return "text"; + case "cta": + return "boolean"; + default: + return "text"; + } +} + // Field mapping types - supports both source field mapping and static values export interface TFieldMapping { targetFieldId: string; @@ -440,13 +527,73 @@ export function parsePayloadToFields(payload: Record): TSourceF const fieldId = prefix ? `${prefix}.${key}` : key; const fieldName = prefix ? `${prefix}.${key}` : key; - if (value !== null && typeof value === "object" && !Array.isArray(value)) { + if (value === null || value === undefined) { + fields.push({ + id: fieldId, + name: fieldName, + type: "string", + sampleValue: String(value), + }); + } else if (Array.isArray(value)) { + // Handle arrays - expand first few elements with index notation + if (value.length === 0) { + fields.push({ + id: fieldId, + name: fieldName, + type: "array", + sampleValue: "[]", + }); + } else { + // Expand up to first 3 array elements + const maxItems = Math.min(value.length, 3); + for (let i = 0; i < maxItems; i++) { + const item = value[i]; + const itemPrefix = `${fieldId}[${i}]`; + + if (item !== null && typeof item === "object" && !Array.isArray(item)) { + // Array of objects - expand the object properties + extractFields(item as Record, itemPrefix); + } else if (Array.isArray(item)) { + // Nested array + fields.push({ + id: itemPrefix, + name: itemPrefix, + type: "array", + sampleValue: `[${item.length} items]`, + }); + } else { + // Primitive array element + let type = "string"; + if (typeof item === "number") type = "number"; + if (typeof item === "boolean") type = "boolean"; + + fields.push({ + id: itemPrefix, + name: itemPrefix, + type, + sampleValue: String(item), + }); + } + } + + // If there are more items, indicate that + if (value.length > 3) { + fields.push({ + id: `${fieldId}[...]`, + name: `${fieldId}[...]`, + type: "info", + sampleValue: `+${value.length - 3} more items`, + }); + } + } + } else if (typeof value === "object") { + // Handle nested objects extractFields(value as Record, fieldId); } else { + // Handle primitives let type = "string"; if (typeof value === "number") type = "number"; if (typeof value === "boolean") type = "boolean"; - if (Array.isArray(value)) type = "array"; fields.push({ id: fieldId, diff --git a/apps/web/app/api/(internal)/pipeline/route.ts b/apps/web/app/api/(internal)/pipeline/route.ts index 7eaaeb1bcd..a897d1a8c8 100644 --- a/apps/web/app/api/(internal)/pipeline/route.ts +++ b/apps/web/app/api/(internal)/pipeline/route.ts @@ -8,6 +8,7 @@ import { sendTelemetryEvents } from "@/app/api/(internal)/pipeline/lib/telemetry import { ZPipelineInput } from "@/app/api/(internal)/pipeline/types/pipelines"; import { responses } from "@/app/lib/api/response"; import { transformErrorToDetails } from "@/app/lib/api/validator"; +import { handleConnectorPipeline } from "@/lib/connector/pipeline-handler"; import { CRON_SECRET } from "@/lib/constants"; import { generateStandardWebhookSignature } from "@/lib/crypto"; import { getIntegrations } from "@/lib/integration/service"; @@ -138,6 +139,14 @@ export const POST = async (request: Request) => { }); if (event === "responseFinished") { + // Handle connector pipeline for Hub integration (only on responseFinished to avoid duplicates) + // This sends response data to the Hub for configured connectors + try { + await handleConnectorPipeline(response, survey, environmentId); + } catch (error) { + // Log but don't throw - connector failures shouldn't break the main pipeline + logger.error({ error, surveyId, responseId: response.id }, "Connector pipeline failed"); + } // Fetch integrations and responseCount in parallel const [integrations, responseCount] = await Promise.all([ getIntegrations(environmentId), diff --git a/apps/web/app/api/unify/webhook-listener/[sessionId]/route.ts b/apps/web/app/api/unify/webhook-listener/[sessionId]/route.ts new file mode 100644 index 0000000000..314be46b73 --- /dev/null +++ b/apps/web/app/api/unify/webhook-listener/[sessionId]/route.ts @@ -0,0 +1,102 @@ +import { NextRequest, NextResponse } from "next/server"; +import { logger } from "@formbricks/logger"; +import { getPayload, storePayload } from "@/lib/connector/webhook-listener-store"; + +// Maximum payload size in bytes (100KB) +const MAX_PAYLOAD_SIZE = 100 * 1024; + +/** + * POST /api/unify/webhook-listener/[sessionId] + * Receive an incoming webhook payload for testing + */ +export async function POST( + request: NextRequest, + props: { params: Promise<{ sessionId: string }> } +): Promise { + const { sessionId } = await props.params; + + if (!sessionId || sessionId.length < 10) { + return NextResponse.json({ error: "Invalid session ID" }, { status: 400 }); + } + + try { + // Check content length + const contentLength = request.headers.get("content-length"); + if (contentLength && parseInt(contentLength, 10) > MAX_PAYLOAD_SIZE) { + return NextResponse.json({ error: "Payload too large. Maximum size is 100KB." }, { status: 413 }); + } + + // Parse the JSON payload + let payload: Record; + try { + payload = await request.json(); + } catch { + return NextResponse.json({ error: "Invalid JSON payload" }, { status: 400 }); + } + + // Validate payload is an object + if (!payload || typeof payload !== "object" || Array.isArray(payload)) { + return NextResponse.json({ error: "Payload must be a JSON object" }, { status: 400 }); + } + + // Store the payload + const stored = storePayload(sessionId, payload); + if (!stored) { + return NextResponse.json({ error: "Failed to store payload. It may be too large." }, { status: 413 }); + } + + logger.info({ sessionId }, "Webhook payload received for session"); + + return NextResponse.json({ success: true, message: "Webhook received successfully" }, { status: 200 }); + } catch (error) { + logger.error({ error, sessionId }, "Error processing webhook payload"); + return NextResponse.json({ error: "Internal server error" }, { status: 500 }); + } +} + +/** + * GET /api/unify/webhook-listener/[sessionId] + * Poll for a received webhook payload + */ +export async function GET( + _request: NextRequest, + props: { params: Promise<{ sessionId: string }> } +): Promise { + const { sessionId } = await props.params; + + if (!sessionId || sessionId.length < 10) { + return NextResponse.json({ error: "Invalid session ID" }, { status: 400 }); + } + + try { + // Get the payload (and clear it from the store) + const payload = getPayload(sessionId, true); + + if (!payload) { + // No payload received yet - return 204 No Content + return new NextResponse(null, { status: 204 }); + } + + // Return the payload + return NextResponse.json({ payload }, { status: 200 }); + } catch (error) { + logger.error({ error, sessionId }, "Error retrieving webhook payload"); + return NextResponse.json({ error: "Internal server error" }, { status: 500 }); + } +} + +/** + * OPTIONS /api/unify/webhook-listener/[sessionId] + * Handle CORS preflight requests + */ +export async function OPTIONS(): Promise { + return new NextResponse(null, { + status: 204, + headers: { + "Access-Control-Allow-Origin": "*", + "Access-Control-Allow-Methods": "GET, POST, OPTIONS", + "Access-Control-Allow-Headers": "Content-Type, Authorization", + "Access-Control-Max-Age": "86400", + }, + }); +} diff --git a/apps/web/app/api/unify/webhook/[connectorId]/route.ts b/apps/web/app/api/unify/webhook/[connectorId]/route.ts new file mode 100644 index 0000000000..b05acf3baa --- /dev/null +++ b/apps/web/app/api/unify/webhook/[connectorId]/route.ts @@ -0,0 +1,276 @@ +import { NextRequest, NextResponse } from "next/server"; +import { prisma } from "@formbricks/database"; +import { logger } from "@formbricks/logger"; +import { TCreateFeedbackRecordInput, createFeedbackRecordsBatch } from "@/lib/connector/hub-client"; +import { getPayload, storePayload } from "@/lib/connector/webhook-listener-store"; + +// Maximum payload size in bytes (100KB) +const MAX_PAYLOAD_SIZE = 100 * 1024; + +/** + * POST /api/unify/webhook/[connectorId] + * + * Receive incoming webhook payloads for a connector. + * - If connector has no field mappings yet: Store payload for setup UI + * - If connector has field mappings: Process and send to Hub + */ +export async function POST( + request: NextRequest, + props: { params: Promise<{ connectorId: string }> } +): Promise { + const { connectorId } = await props.params; + + if (!connectorId) { + return NextResponse.json({ error: "Connector ID required" }, { status: 400 }); + } + + try { + // Check content length + const contentLength = request.headers.get("content-length"); + if (contentLength && parseInt(contentLength, 10) > MAX_PAYLOAD_SIZE) { + return NextResponse.json({ error: "Payload too large. Maximum size is 100KB." }, { status: 413 }); + } + + // Parse the JSON payload + let payload: Record; + try { + payload = await request.json(); + } catch { + return NextResponse.json({ error: "Invalid JSON payload" }, { status: 400 }); + } + + // Validate payload is an object + if (!payload || typeof payload !== "object" || Array.isArray(payload)) { + return NextResponse.json({ error: "Payload must be a JSON object" }, { status: 400 }); + } + + // Fetch connector with its field mappings + const connector = await prisma.connector.findUnique({ + where: { id: connectorId }, + include: { + fieldMappings: true, + }, + }); + + if (!connector) { + return NextResponse.json({ error: "Connector not found" }, { status: 404 }); + } + + if (connector.type !== "webhook") { + return NextResponse.json({ error: "This endpoint is only for webhook connectors" }, { status: 400 }); + } + + if (connector.status !== "active") { + return NextResponse.json({ error: "Connector is not active" }, { status: 400 }); + } + + // Check if connector has field mappings configured + const hasMappings = connector.fieldMappings && connector.fieldMappings.length > 0; + + if (!hasMappings) { + // Setup phase: Store payload for UI to fetch + const stored = storePayload(connectorId, payload); + if (!stored) { + return NextResponse.json({ error: "Failed to store payload. It may be too large." }, { status: 413 }); + } + + logger.info({ connectorId }, "Webhook payload stored for setup"); + return NextResponse.json( + { success: true, message: "Payload received for setup", mode: "setup" }, + { status: 200 } + ); + } + + // Production phase: Transform and send to Hub + const feedbackRecords: TCreateFeedbackRecordInput[] = []; + + // Build a single feedback record from the payload using field mappings + const record: Record = {}; + + for (const mapping of connector.fieldMappings) { + let value: unknown; + + if (mapping.staticValue) { + // Use static value + value = mapping.staticValue; + // Handle special static values + if (value === "$now") { + value = new Date().toISOString(); + } + } else { + // Get value from payload using dot notation path + value = getNestedValue(payload, mapping.sourceFieldId); + } + + if (value !== undefined && value !== null) { + record[mapping.targetFieldId] = value; + } + } + + // Ensure required fields have defaults + if (!record.source_type) { + record.source_type = "webhook"; + } + if (!record.collected_at) { + record.collected_at = new Date().toISOString(); + } + if (!record.field_type) { + record.field_type = "text"; + } + if (!record.field_id) { + record.field_id = connectorId; + } + + // Add environment as tenant + record.tenant_id = connector.environmentId; + + feedbackRecords.push(record as TCreateFeedbackRecordInput); + + // Send to Hub + const { results } = await createFeedbackRecordsBatch(feedbackRecords); + + const successCount = results.filter((r) => r.data && !r.error).length; + const errorCount = results.filter((r) => r.error).length; + + if (errorCount > 0) { + logger.error( + { connectorId, errors: results.filter((r) => r.error).map((r) => r.error) }, + "Some feedback records failed to create" + ); + } + + // Update connector last sync time + await prisma.connector.update({ + where: { id: connectorId }, + data: { + lastSyncAt: new Date(), + errorMessage: errorCount > 0 ? `${errorCount} records failed` : null, + }, + }); + + logger.info({ connectorId, successCount, errorCount }, "Webhook processed"); + + return NextResponse.json( + { + success: true, + message: "Webhook processed", + mode: "production", + records_created: successCount, + records_failed: errorCount, + }, + { status: 200 } + ); + } catch (error) { + logger.error({ error, connectorId }, "Error processing webhook"); + return NextResponse.json({ error: "Internal server error" }, { status: 500 }); + } +} + +/** + * GET /api/unify/webhook/[connectorId] + * + * Poll for a received webhook payload during setup phase. + * Returns the stored payload if one exists. + */ +export async function GET( + _request: NextRequest, + props: { params: Promise<{ connectorId: string }> } +): Promise { + const { connectorId } = await props.params; + + if (!connectorId) { + return NextResponse.json({ error: "Connector ID required" }, { status: 400 }); + } + + try { + // Verify connector exists + const connector = await prisma.connector.findUnique({ + where: { id: connectorId }, + select: { id: true, type: true }, + }); + + if (!connector) { + return NextResponse.json({ error: "Connector not found" }, { status: 404 }); + } + + if (connector.type !== "webhook") { + return NextResponse.json({ error: "This endpoint is only for webhook connectors" }, { status: 400 }); + } + + // Get the stored payload (and clear it) + const payload = getPayload(connectorId, true); + + if (!payload) { + // No payload received yet + return new NextResponse(null, { status: 204 }); + } + + return NextResponse.json({ payload }, { status: 200 }); + } catch (error) { + logger.error({ error, connectorId }, "Error retrieving webhook payload"); + return NextResponse.json({ error: "Internal server error" }, { status: 500 }); + } +} + +/** + * OPTIONS /api/unify/webhook/[connectorId] + * Handle CORS preflight requests + */ +export async function OPTIONS(): Promise { + return new NextResponse(null, { + status: 204, + headers: { + "Access-Control-Allow-Origin": "*", + "Access-Control-Allow-Methods": "GET, POST, OPTIONS", + "Access-Control-Allow-Headers": "Content-Type, Authorization", + "Access-Control-Max-Age": "86400", + }, + }); +} + +/** + * Helper to get a nested value from an object using dot notation and array brackets + * Supports paths like: "form_response.answers[0].text" or "data.items[2].name" + */ +function getNestedValue(obj: Record, path: string): unknown { + let current: unknown = obj; + + // Split by dots, but we need to handle array notation within each segment + const segments = path.split("."); + + for (const segment of segments) { + if (current === null || current === undefined) { + return undefined; + } + + // Check if segment contains array notation like "answers[0]" or just "[0]" + const arrayMatch = segment.match(/^([^\[]*)\[(\d+)\]$/); + + if (arrayMatch) { + const [, propertyName, indexStr] = arrayMatch; + const index = parseInt(indexStr, 10); + + // If there's a property name before the bracket, access it first + if (propertyName) { + if (typeof current !== "object") { + return undefined; + } + current = (current as Record)[propertyName]; + } + + // Now access the array index + if (!Array.isArray(current)) { + return undefined; + } + current = current[index]; + } else { + // Regular property access + if (typeof current !== "object") { + return undefined; + } + current = (current as Record)[segment]; + } + } + + return current; +} diff --git a/apps/web/lib/connector/actions.ts b/apps/web/lib/connector/actions.ts new file mode 100644 index 0000000000..45c91aaa9c --- /dev/null +++ b/apps/web/lib/connector/actions.ts @@ -0,0 +1,454 @@ +"use server"; + +import { z } from "zod"; +import { ZId } from "@formbricks/types/common"; +import { + TConnector, + TConnectorFieldMapping, + TConnectorFormbricksMapping, + TConnectorWithMappings, + ZConnectorCreateInput, + ZConnectorFieldMappingCreateInput, + ZConnectorFormbricksMappingCreateInput, + ZConnectorUpdateInput, +} from "@formbricks/types/connector"; +import { authenticatedActionClient } from "@/lib/utils/action-client"; +import { checkAuthorizationUpdated } from "@/lib/utils/action-client/action-client-middleware"; +import { AuthenticatedActionClientCtx } from "@/lib/utils/action-client/types/context"; +import { + getOrganizationIdFromConnectorId, + getOrganizationIdFromEnvironmentId, + getProjectIdFromConnectorId, + getProjectIdFromEnvironmentId, +} from "@/lib/utils/helper"; +import { + createConnector, + createFieldMappings, + createFormbricksMappings, + deleteConnector, + deleteFieldMapping, + deleteFormbricksMapping, + getConnector, + getConnectorWithMappings, + getConnectors, + getConnectorsWithMappings, + syncFieldMappings, + syncFormbricksMappings, + updateConnector, +} from "./service"; + +// Get all connectors for an environment +const ZGetConnectorsAction = z.object({ + environmentId: ZId, +}); + +export const getConnectorsAction = authenticatedActionClient + .schema(ZGetConnectorsAction) + .action(async ({ ctx, parsedInput }): Promise => { + const organizationId = await getOrganizationIdFromEnvironmentId(parsedInput.environmentId); + await checkAuthorizationUpdated({ + userId: ctx.user.id, + organizationId, + access: [ + { + type: "organization", + roles: ["owner", "manager", "member"], + }, + { + type: "projectTeam", + minPermission: "read", + projectId: await getProjectIdFromEnvironmentId(parsedInput.environmentId), + }, + ], + }); + + return getConnectors(parsedInput.environmentId); + }); + +// Get all connectors with mappings for an environment +const ZGetConnectorsWithMappingsAction = z.object({ + environmentId: ZId, +}); + +export const getConnectorsWithMappingsAction = authenticatedActionClient + .schema(ZGetConnectorsWithMappingsAction) + .action(async ({ ctx, parsedInput }): Promise => { + const organizationId = await getOrganizationIdFromEnvironmentId(parsedInput.environmentId); + await checkAuthorizationUpdated({ + userId: ctx.user.id, + organizationId, + access: [ + { + type: "organization", + roles: ["owner", "manager", "member"], + }, + { + type: "projectTeam", + minPermission: "read", + projectId: await getProjectIdFromEnvironmentId(parsedInput.environmentId), + }, + ], + }); + + return getConnectorsWithMappings(parsedInput.environmentId); + }); + +// Get a single connector with mappings +const ZGetConnectorWithMappingsAction = z.object({ + connectorId: ZId, +}); + +export const getConnectorWithMappingsAction = authenticatedActionClient + .schema(ZGetConnectorWithMappingsAction) + .action(async ({ ctx, parsedInput }): Promise => { + const organizationId = await getOrganizationIdFromConnectorId(parsedInput.connectorId); + await checkAuthorizationUpdated({ + userId: ctx.user.id, + organizationId, + access: [ + { + type: "organization", + roles: ["owner", "manager", "member"], + }, + { + type: "projectTeam", + minPermission: "read", + projectId: await getProjectIdFromConnectorId(parsedInput.connectorId), + }, + ], + }); + + return getConnectorWithMappings(parsedInput.connectorId); + }); + +// Create a new connector +const ZCreateConnectorAction = z.object({ + environmentId: ZId, + connectorInput: ZConnectorCreateInput, +}); + +export const createConnectorAction = authenticatedActionClient + .schema(ZCreateConnectorAction) + .action( + async ({ + ctx, + parsedInput, + }: { + ctx: AuthenticatedActionClientCtx; + parsedInput: z.infer; + }) => { + const organizationId = await getOrganizationIdFromEnvironmentId(parsedInput.environmentId); + await checkAuthorizationUpdated({ + userId: ctx.user.id, + organizationId, + access: [ + { + type: "organization", + roles: ["owner", "manager"], + }, + { + type: "projectTeam", + minPermission: "readWrite", + projectId: await getProjectIdFromEnvironmentId(parsedInput.environmentId), + }, + ], + }); + + return createConnector(parsedInput.environmentId, parsedInput.connectorInput); + } + ); + +// Update a connector +const ZUpdateConnectorAction = z.object({ + connectorId: ZId, + connectorInput: ZConnectorUpdateInput, +}); + +export const updateConnectorAction = authenticatedActionClient + .schema(ZUpdateConnectorAction) + .action( + async ({ + ctx, + parsedInput, + }: { + ctx: AuthenticatedActionClientCtx; + parsedInput: z.infer; + }) => { + const organizationId = await getOrganizationIdFromConnectorId(parsedInput.connectorId); + await checkAuthorizationUpdated({ + userId: ctx.user.id, + organizationId, + access: [ + { + type: "organization", + roles: ["owner", "manager"], + }, + { + type: "projectTeam", + minPermission: "readWrite", + projectId: await getProjectIdFromConnectorId(parsedInput.connectorId), + }, + ], + }); + + return updateConnector(parsedInput.connectorId, parsedInput.connectorInput); + } + ); + +// Delete a connector +const ZDeleteConnectorAction = z.object({ + connectorId: ZId, +}); + +export const deleteConnectorAction = authenticatedActionClient + .schema(ZDeleteConnectorAction) + .action( + async ({ + ctx, + parsedInput, + }: { + ctx: AuthenticatedActionClientCtx; + parsedInput: z.infer; + }) => { + const organizationId = await getOrganizationIdFromConnectorId(parsedInput.connectorId); + await checkAuthorizationUpdated({ + userId: ctx.user.id, + organizationId, + access: [ + { + type: "organization", + roles: ["owner", "manager"], + }, + { + type: "projectTeam", + minPermission: "readWrite", + projectId: await getProjectIdFromConnectorId(parsedInput.connectorId), + }, + ], + }); + + return deleteConnector(parsedInput.connectorId); + } + ); + +// Create Formbricks mappings for a connector +const ZCreateFormbricksMappingsAction = z.object({ + connectorId: ZId, + mappings: z.array(ZConnectorFormbricksMappingCreateInput), +}); + +export const createFormbricksMappingsAction = authenticatedActionClient + .schema(ZCreateFormbricksMappingsAction) + .action( + async ({ + ctx, + parsedInput, + }: { + ctx: AuthenticatedActionClientCtx; + parsedInput: z.infer; + }) => { + const organizationId = await getOrganizationIdFromConnectorId(parsedInput.connectorId); + await checkAuthorizationUpdated({ + userId: ctx.user.id, + organizationId, + access: [ + { + type: "organization", + roles: ["owner", "manager"], + }, + { + type: "projectTeam", + minPermission: "readWrite", + projectId: await getProjectIdFromConnectorId(parsedInput.connectorId), + }, + ], + }); + + return createFormbricksMappings(parsedInput.connectorId, parsedInput.mappings); + } + ); + +// Sync (replace) Formbricks mappings for a connector +const ZSyncFormbricksMappingsAction = z.object({ + connectorId: ZId, + mappings: z.array(ZConnectorFormbricksMappingCreateInput), +}); + +export const syncFormbricksMappingsAction = authenticatedActionClient + .schema(ZSyncFormbricksMappingsAction) + .action( + async ({ + ctx, + parsedInput, + }: { + ctx: AuthenticatedActionClientCtx; + parsedInput: z.infer; + }) => { + const organizationId = await getOrganizationIdFromConnectorId(parsedInput.connectorId); + await checkAuthorizationUpdated({ + userId: ctx.user.id, + organizationId, + access: [ + { + type: "organization", + roles: ["owner", "manager"], + }, + { + type: "projectTeam", + minPermission: "readWrite", + projectId: await getProjectIdFromConnectorId(parsedInput.connectorId), + }, + ], + }); + + return syncFormbricksMappings(parsedInput.connectorId, parsedInput.mappings); + } + ); + +// Delete a Formbricks mapping +const ZDeleteFormbricksMappingAction = z.object({ + mappingId: ZId, + connectorId: ZId, // For authorization +}); + +export const deleteFormbricksMappingAction = authenticatedActionClient + .schema(ZDeleteFormbricksMappingAction) + .action( + async ({ + ctx, + parsedInput, + }: { + ctx: AuthenticatedActionClientCtx; + parsedInput: z.infer; + }) => { + const organizationId = await getOrganizationIdFromConnectorId(parsedInput.connectorId); + await checkAuthorizationUpdated({ + userId: ctx.user.id, + organizationId, + access: [ + { + type: "organization", + roles: ["owner", "manager"], + }, + { + type: "projectTeam", + minPermission: "readWrite", + projectId: await getProjectIdFromConnectorId(parsedInput.connectorId), + }, + ], + }); + + return deleteFormbricksMapping(parsedInput.mappingId); + } + ); + +// Create field mappings for a connector (webhook, csv, etc.) +const ZCreateFieldMappingsAction = z.object({ + connectorId: ZId, + mappings: z.array(ZConnectorFieldMappingCreateInput), +}); + +export const createFieldMappingsAction = authenticatedActionClient + .schema(ZCreateFieldMappingsAction) + .action( + async ({ + ctx, + parsedInput, + }: { + ctx: AuthenticatedActionClientCtx; + parsedInput: z.infer; + }) => { + const organizationId = await getOrganizationIdFromConnectorId(parsedInput.connectorId); + await checkAuthorizationUpdated({ + userId: ctx.user.id, + organizationId, + access: [ + { + type: "organization", + roles: ["owner", "manager"], + }, + { + type: "projectTeam", + minPermission: "readWrite", + projectId: await getProjectIdFromConnectorId(parsedInput.connectorId), + }, + ], + }); + + return createFieldMappings(parsedInput.connectorId, parsedInput.mappings); + } + ); + +// Sync (replace) field mappings for a connector +const ZSyncFieldMappingsAction = z.object({ + connectorId: ZId, + mappings: z.array(ZConnectorFieldMappingCreateInput), +}); + +export const syncFieldMappingsAction = authenticatedActionClient + .schema(ZSyncFieldMappingsAction) + .action( + async ({ + ctx, + parsedInput, + }: { + ctx: AuthenticatedActionClientCtx; + parsedInput: z.infer; + }) => { + const organizationId = await getOrganizationIdFromConnectorId(parsedInput.connectorId); + await checkAuthorizationUpdated({ + userId: ctx.user.id, + organizationId, + access: [ + { + type: "organization", + roles: ["owner", "manager"], + }, + { + type: "projectTeam", + minPermission: "readWrite", + projectId: await getProjectIdFromConnectorId(parsedInput.connectorId), + }, + ], + }); + + return syncFieldMappings(parsedInput.connectorId, parsedInput.mappings); + } + ); + +// Delete a field mapping +const ZDeleteFieldMappingAction = z.object({ + mappingId: ZId, + connectorId: ZId, // For authorization +}); + +export const deleteFieldMappingAction = authenticatedActionClient + .schema(ZDeleteFieldMappingAction) + .action( + async ({ + ctx, + parsedInput, + }: { + ctx: AuthenticatedActionClientCtx; + parsedInput: z.infer; + }) => { + const organizationId = await getOrganizationIdFromConnectorId(parsedInput.connectorId); + await checkAuthorizationUpdated({ + userId: ctx.user.id, + organizationId, + access: [ + { + type: "organization", + roles: ["owner", "manager"], + }, + { + type: "projectTeam", + minPermission: "readWrite", + projectId: await getProjectIdFromConnectorId(parsedInput.connectorId), + }, + ], + }); + + return deleteFieldMapping(parsedInput.mappingId); + } + ); diff --git a/apps/web/lib/connector/hub-client.ts b/apps/web/lib/connector/hub-client.ts new file mode 100644 index 0000000000..305771f8c7 --- /dev/null +++ b/apps/web/lib/connector/hub-client.ts @@ -0,0 +1,315 @@ +import "server-only"; +import { logger } from "@formbricks/logger"; + +// Hub API base URL - should be configurable via environment variable +const HUB_API_URL = process.env.HUB_API_URL || "http://localhost:8080"; +const HUB_API_KEY = process.env.HUB_API_KEY || ""; + +// Hub field types (from OpenAPI spec) +export type THubFieldType = + | "text" + | "categorical" + | "nps" + | "csat" + | "ces" + | "rating" + | "number" + | "boolean" + | "date"; + +// Create FeedbackRecord input +export interface TCreateFeedbackRecordInput { + collected_at?: string; // ISO 8601 datetime, defaults to now + source_type: string; // Required + field_id: string; // Required + field_type: THubFieldType; // Required + tenant_id?: string; + response_id?: string; + source_id?: string; + source_name?: string; + field_label?: string; + value_text?: string; + value_number?: number; + value_boolean?: boolean; + value_date?: string; + metadata?: Record; + language?: string; + user_identifier?: string; +} + +// FeedbackRecord data (response from Hub) +export interface TFeedbackRecordData { + id: string; + collected_at: string; + created_at: string; + updated_at: string; + source_type: string; + field_id: string; + field_type: string; + tenant_id?: string; + response_id?: string; + source_id?: string; + source_name?: string; + field_label?: string; + value_text?: string; + value_number?: number; + value_boolean?: boolean; + value_date?: string; + metadata?: Record; + language?: string; + user_identifier?: string; +} + +// List FeedbackRecords response +export interface TListFeedbackRecordsResponse { + data: TFeedbackRecordData[]; + total: number; + limit: number; + offset: number; +} + +// Update FeedbackRecord input +export interface TUpdateFeedbackRecordInput { + value_text?: string; + value_number?: number; + value_boolean?: boolean; + value_date?: string; + metadata?: Record; + language?: string; + user_identifier?: string; +} + +// List FeedbackRecords filters +export interface TListFeedbackRecordsFilters { + tenant_id?: string; + response_id?: string; + source_type?: string; + source_id?: string; + field_id?: string; + field_type?: string; + user_identifier?: string; + since?: string; // ISO 8601 + until?: string; // ISO 8601 + limit?: number; + offset?: number; +} + +// Error response from Hub +export interface THubErrorResponse { + type?: string; + title: string; + status: number; + detail: string; + instance?: string; + errors?: Array<{ + location?: string; + message?: string; + value?: unknown; + }>; +} + +// Hub API Error class +export class HubApiError extends Error { + status: number; + detail: string; + errors?: THubErrorResponse["errors"]; + + constructor(response: THubErrorResponse) { + super(response.detail || response.title); + this.name = "HubApiError"; + this.status = response.status; + this.detail = response.detail; + this.errors = response.errors; + } +} + +// Make authenticated request to Hub API +async function hubFetch( + path: string, + options: RequestInit = {} +): Promise<{ data: T | null; error: HubApiError | null }> { + const url = `${HUB_API_URL}${path}`; + + const headers: HeadersInit = { + "Content-Type": "application/json", + ...(HUB_API_KEY && { Authorization: `Bearer ${HUB_API_KEY}` }), + ...options.headers, + }; + + try { + const response = await fetch(url, { + ...options, + headers, + }); + + // Handle no content response (e.g., DELETE) + if (response.status === 204) { + return { data: null, error: null }; + } + + const contentType = response.headers.get("content-type"); + + if (!response.ok) { + // Try to parse error response + if (contentType?.includes("application/problem+json") || contentType?.includes("application/json")) { + const errorBody = (await response.json()) as THubErrorResponse; + return { data: null, error: new HubApiError(errorBody) }; + } + + // Fallback for non-JSON errors + const errorText = await response.text(); + return { + data: null, + error: new HubApiError({ + title: "Error", + status: response.status, + detail: errorText || `HTTP ${response.status}`, + }), + }; + } + + // Parse successful response + if (contentType?.includes("application/json")) { + const data = (await response.json()) as T; + return { data, error: null }; + } + + return { data: null, error: null }; + } catch (error) { + logger.error("Hub API request failed", { url, error }); + return { + data: null, + error: new HubApiError({ + title: "Network Error", + status: 0, + detail: error instanceof Error ? error.message : "Failed to connect to Hub API", + }), + }; + } +} + +/** + * Create a new FeedbackRecord in the Hub + */ +export async function createFeedbackRecord( + input: TCreateFeedbackRecordInput +): Promise<{ data: TFeedbackRecordData | null; error: HubApiError | null }> { + return hubFetch("/v1/feedback-records", { + method: "POST", + body: JSON.stringify(input), + }); +} + +/** + * Create multiple FeedbackRecords in the Hub (batch) + */ +export async function createFeedbackRecordsBatch( + inputs: TCreateFeedbackRecordInput[] +): Promise<{ results: Array<{ data: TFeedbackRecordData | null; error: HubApiError | null }> }> { + // Hub doesn't have a batch endpoint, so we'll do parallel requests + // In production, you might want to implement rate limiting or chunking + const results = await Promise.all(inputs.map((input) => createFeedbackRecord(input))); + return { results }; +} + +/** + * List FeedbackRecords from the Hub with optional filters + */ +export async function listFeedbackRecords( + filters: TListFeedbackRecordsFilters = {} +): Promise<{ data: TListFeedbackRecordsResponse | null; error: HubApiError | null }> { + const searchParams = new URLSearchParams(); + + if (filters.tenant_id) searchParams.set("tenant_id", filters.tenant_id); + if (filters.response_id) searchParams.set("response_id", filters.response_id); + if (filters.source_type) searchParams.set("source_type", filters.source_type); + if (filters.source_id) searchParams.set("source_id", filters.source_id); + if (filters.field_id) searchParams.set("field_id", filters.field_id); + if (filters.field_type) searchParams.set("field_type", filters.field_type); + if (filters.user_identifier) searchParams.set("user_identifier", filters.user_identifier); + if (filters.since) searchParams.set("since", filters.since); + if (filters.until) searchParams.set("until", filters.until); + if (filters.limit !== undefined) searchParams.set("limit", String(filters.limit)); + if (filters.offset !== undefined) searchParams.set("offset", String(filters.offset)); + + const queryString = searchParams.toString(); + const path = queryString ? `/v1/feedback-records?${queryString}` : "/v1/feedback-records"; + + return hubFetch(path, { method: "GET" }); +} + +/** + * Get a single FeedbackRecord from the Hub by ID + */ +export async function getFeedbackRecord( + id: string +): Promise<{ data: TFeedbackRecordData | null; error: HubApiError | null }> { + return hubFetch(`/v1/feedback-records/${id}`, { method: "GET" }); +} + +/** + * Update a FeedbackRecord in the Hub + */ +export async function updateFeedbackRecord( + id: string, + input: TUpdateFeedbackRecordInput +): Promise<{ data: TFeedbackRecordData | null; error: HubApiError | null }> { + return hubFetch(`/v1/feedback-records/${id}`, { + method: "PATCH", + body: JSON.stringify(input), + }); +} + +/** + * Delete a FeedbackRecord from the Hub + */ +export async function deleteFeedbackRecord(id: string): Promise<{ error: HubApiError | null }> { + const result = await hubFetch(`/v1/feedback-records/${id}`, { method: "DELETE" }); + return { error: result.error }; +} + +/** + * Bulk delete FeedbackRecords by user identifier (GDPR compliance) + */ +export async function bulkDeleteFeedbackRecordsByUser( + userIdentifier: string, + tenantId?: string +): Promise<{ data: { deleted_count: number; message: string } | null; error: HubApiError | null }> { + const searchParams = new URLSearchParams(); + searchParams.set("user_identifier", userIdentifier); + if (tenantId) searchParams.set("tenant_id", tenantId); + + return hubFetch<{ deleted_count: number; message: string }>( + `/v1/feedback-records?${searchParams.toString()}`, + { method: "DELETE" } + ); +} + +/** + * Check Hub API health + */ +export async function checkHubHealth(): Promise<{ healthy: boolean; error: HubApiError | null }> { + try { + const response = await fetch(`${HUB_API_URL}/health`); + if (response.ok) { + return { healthy: true, error: null }; + } + return { + healthy: false, + error: new HubApiError({ + title: "Health Check Failed", + status: response.status, + detail: "Hub API health check failed", + }), + }; + } catch (error) { + return { + healthy: false, + error: new HubApiError({ + title: "Network Error", + status: 0, + detail: error instanceof Error ? error.message : "Failed to connect to Hub API", + }), + }; + } +} diff --git a/apps/web/lib/connector/pipeline-handler.ts b/apps/web/lib/connector/pipeline-handler.ts new file mode 100644 index 0000000000..e45a25d0b2 --- /dev/null +++ b/apps/web/lib/connector/pipeline-handler.ts @@ -0,0 +1,138 @@ +import "server-only"; +import { logger } from "@formbricks/logger"; +import { TResponse } from "@formbricks/types/responses"; +import { TSurvey } from "@formbricks/types/surveys/types"; +import { createFeedbackRecordsBatch } from "./hub-client"; +import { getConnectorsBySurveyId, updateConnector } from "./service"; +import { transformResponseToFeedbackRecords } from "./transform"; + +/** + * Handle connector pipeline for a survey response + * + * This function is called from the pipeline when a response is created/finished. + * It looks up active connectors for the survey and sends the response data to the Hub. + * + * @param response - The survey response + * @param survey - The survey + * @param environmentId - The environment ID (used as tenant_id) + */ +export async function handleConnectorPipeline( + response: TResponse, + survey: TSurvey, + environmentId: string +): Promise { + try { + // Get all active Formbricks connectors for this survey + const connectors = await getConnectorsBySurveyId(survey.id); + + if (connectors.length === 0) { + // No connectors configured for this survey + return; + } + + // Process each connector + for (const connector of connectors) { + try { + // Transform response to FeedbackRecords using the connector's mappings + const feedbackRecords = transformResponseToFeedbackRecords( + response, + survey, + connector.formbricksMappings, + environmentId // Use environment ID as tenant_id + ); + + if (feedbackRecords.length === 0) { + // No mapped elements had values in this response + continue; + } + + // Send to Hub API + const { results } = await createFeedbackRecordsBatch(feedbackRecords); + + // Count successes and failures + const successes = results.filter((r) => r.data !== null).length; + const failures = results.filter((r) => r.error !== null).length; + + if (failures > 0) { + logger.warn( + { + connectorId: connector.id, + surveyId: survey.id, + responseId: response.id, + successes, + failures, + }, + `Connector pipeline: ${failures}/${feedbackRecords.length} FeedbackRecords failed to send` + ); + + // Log the specific errors + results.forEach((result, index) => { + if (result.error) { + logger.error( + { + connectorId: connector.id, + feedbackRecordIndex: index, + error: { + status: result.error.status, + message: result.error.message, + detail: result.error.detail, + }, + }, + "Failed to create FeedbackRecord in Hub" + ); + } + }); + + // Update connector with error message if all failed + if (successes === 0) { + await updateConnector(connector.id, { + errorMessage: `Failed to send FeedbackRecords to Hub: ${results[0].error?.message || "Unknown error"}`, + }); + } + } else { + logger.info( + { + connectorId: connector.id, + surveyId: survey.id, + responseId: response.id, + feedbackRecordsCreated: successes, + }, + `Connector pipeline: Successfully sent ${successes} FeedbackRecords to Hub` + ); + + // Clear any previous error and update lastSyncAt + await updateConnector(connector.id, { + errorMessage: null, + lastSyncAt: new Date(), + }); + } + } catch (error) { + logger.error( + { + connectorId: connector.id, + surveyId: survey.id, + responseId: response.id, + error: error instanceof Error ? error.message : "Unknown error", + }, + "Connector pipeline: Failed to process connector" + ); + + // Update connector with error + await updateConnector(connector.id, { + status: "error", + errorMessage: error instanceof Error ? error.message : "Unknown error", + }); + } + } + } catch (error) { + // Log but don't throw - we don't want to break the main pipeline + logger.error( + { + surveyId: survey.id, + responseId: response.id, + error: error instanceof Error ? error.message : "Unknown error", + }, + "Connector pipeline: Failed to handle connectors" + ); + } +} diff --git a/apps/web/lib/connector/service.ts b/apps/web/lib/connector/service.ts new file mode 100644 index 0000000000..af6dfd9d06 --- /dev/null +++ b/apps/web/lib/connector/service.ts @@ -0,0 +1,663 @@ +import "server-only"; +import { Prisma } from "@prisma/client"; +import { cache as reactCache } from "react"; +import { prisma } from "@formbricks/database"; +import { PrismaErrorType } from "@formbricks/database/types/error"; +import { ZId, ZOptionalNumber } from "@formbricks/types/common"; +import { + TConnector, + TConnectorCreateInput, + TConnectorFieldMapping, + TConnectorFieldMappingCreateInput, + TConnectorFormbricksMapping, + TConnectorFormbricksMappingCreateInput, + TConnectorUpdateInput, + TConnectorWithMappings, + TFormbricksConnector, + ZConnectorCreateInput, + ZConnectorFieldMappingCreateInput, + ZConnectorFormbricksMappingCreateInput, + ZConnectorUpdateInput, +} from "@formbricks/types/connector"; +import { Result, err, ok } from "@formbricks/types/error-handlers"; +import { DatabaseError, ResourceNotFoundError } from "@formbricks/types/errors"; +import { ITEMS_PER_PAGE } from "../constants"; +import { validateInputs } from "../utils/validate"; + +// Select object for Connector with Formbricks mappings +const selectConnectorWithFormbricksMappings = { + id: true, + createdAt: true, + updatedAt: true, + name: true, + type: true, + status: true, + environmentId: true, + config: true, + lastSyncAt: true, + errorMessage: true, + formbricksMappings: { + select: { + id: true, + createdAt: true, + connectorId: true, + surveyId: true, + elementId: true, + hubFieldType: true, + customFieldLabel: true, + survey: { + select: { + id: true, + name: true, + status: true, + }, + }, + }, + }, + fieldMappings: { + select: { + id: true, + createdAt: true, + connectorId: true, + sourceFieldId: true, + targetFieldId: true, + staticValue: true, + }, + }, +} satisfies Prisma.ConnectorSelect; + +// Select object for Connector without mappings +const selectConnector = { + id: true, + createdAt: true, + updatedAt: true, + name: true, + type: true, + status: true, + environmentId: true, + config: true, + lastSyncAt: true, + errorMessage: true, +} satisfies Prisma.ConnectorSelect; + +/** + * Get all connectors for an environment + */ +export const getConnectors = reactCache( + async (environmentId: string, page?: number): Promise => { + validateInputs([environmentId, ZId], [page, ZOptionalNumber]); + + try { + const connectors = await prisma.connector.findMany({ + where: { + environmentId, + }, + select: selectConnector, + orderBy: { + createdAt: "desc", + }, + take: page ? ITEMS_PER_PAGE : undefined, + skip: page ? ITEMS_PER_PAGE * (page - 1) : undefined, + }); + + return connectors as TConnector[]; + } catch (error) { + if (error instanceof Prisma.PrismaClientKnownRequestError) { + throw new DatabaseError(error.message); + } + throw error; + } + } +); + +/** + * Get all connectors for an environment with their mappings + */ +export const getConnectorsWithMappings = reactCache( + async (environmentId: string, page?: number): Promise => { + validateInputs([environmentId, ZId], [page, ZOptionalNumber]); + + try { + const connectors = await prisma.connector.findMany({ + where: { + environmentId, + }, + select: selectConnectorWithFormbricksMappings, + orderBy: { + createdAt: "desc", + }, + take: page ? ITEMS_PER_PAGE : undefined, + skip: page ? ITEMS_PER_PAGE * (page - 1) : undefined, + }); + + return connectors as unknown as TConnectorWithMappings[]; + } catch (error) { + if (error instanceof Prisma.PrismaClientKnownRequestError) { + throw new DatabaseError(error.message); + } + throw error; + } + } +); + +/** + * Get a single connector by ID + */ +export const getConnector = reactCache(async (connectorId: string): Promise => { + validateInputs([connectorId, ZId]); + + try { + const connector = await prisma.connector.findUnique({ + where: { + id: connectorId, + }, + select: selectConnector, + }); + + return connector as TConnector | null; + } catch (error) { + if (error instanceof Prisma.PrismaClientKnownRequestError) { + throw new DatabaseError(error.message); + } + throw error; + } +}); + +/** + * Get a single connector by ID with its mappings + */ +export const getConnectorWithMappings = reactCache( + async (connectorId: string): Promise => { + validateInputs([connectorId, ZId]); + + try { + const connector = await prisma.connector.findUnique({ + where: { + id: connectorId, + }, + select: selectConnectorWithFormbricksMappings, + }); + + return connector as unknown as TConnectorWithMappings | null; + } catch (error) { + if (error instanceof Prisma.PrismaClientKnownRequestError) { + throw new DatabaseError(error.message); + } + throw error; + } + } +); + +/** + * Get all Formbricks connectors that have mappings for a specific survey + */ +export const getConnectorsBySurveyId = reactCache( + async (surveyId: string): Promise => { + validateInputs([surveyId, ZId]); + + try { + const connectors = await prisma.connector.findMany({ + where: { + type: "formbricks", + status: "active", + formbricksMappings: { + some: { + surveyId, + }, + }, + }, + select: selectConnectorWithFormbricksMappings, + }); + + return connectors as unknown as TFormbricksConnector[]; + } catch (error) { + if (error instanceof Prisma.PrismaClientKnownRequestError) { + throw new DatabaseError(error.message); + } + throw error; + } + } +); + +export enum ConnectorError { + CONNECTOR_NOT_FOUND = "CONNECTOR_NOT_FOUND", + DUPLICATE_MAPPING = "DUPLICATE_MAPPING", + UNEXPECTED_ERROR = "UNEXPECTED_ERROR", +} + +/** + * Create a new connector + */ +export const createConnector = async ( + environmentId: string, + data: TConnectorCreateInput +): Promise> => { + validateInputs([environmentId, ZId], [data, ZConnectorCreateInput]); + + try { + const connector = await prisma.connector.create({ + data: { + name: data.name, + type: data.type, + environmentId, + config: data.config ?? Prisma.JsonNull, + }, + select: selectConnector, + }); + + return ok(connector as TConnector); + } catch (error) { + if (error instanceof Prisma.PrismaClientKnownRequestError) { + return err({ + code: ConnectorError.UNEXPECTED_ERROR, + message: error.message, + }); + } + return err({ + code: ConnectorError.UNEXPECTED_ERROR, + message: error instanceof Error ? error.message : "Unknown error", + }); + } +}; + +/** + * Update a connector + */ +export const updateConnector = async ( + connectorId: string, + data: TConnectorUpdateInput +): Promise> => { + validateInputs([connectorId, ZId], [data, ZConnectorUpdateInput]); + + try { + const connector = await prisma.connector.update({ + where: { + id: connectorId, + }, + data: { + name: data.name, + status: data.status, + config: data.config ?? undefined, + errorMessage: data.errorMessage, + lastSyncAt: data.lastSyncAt, + }, + select: selectConnector, + }); + + return ok(connector as TConnector); + } catch (error) { + if (error instanceof Prisma.PrismaClientKnownRequestError) { + if (error.code === PrismaErrorType.RecordNotFound) { + return err({ + code: ConnectorError.CONNECTOR_NOT_FOUND, + message: "Connector not found", + }); + } + return err({ + code: ConnectorError.UNEXPECTED_ERROR, + message: error.message, + }); + } + return err({ + code: ConnectorError.UNEXPECTED_ERROR, + message: error instanceof Error ? error.message : "Unknown error", + }); + } +}; + +/** + * Delete a connector + */ +export const deleteConnector = async ( + connectorId: string +): Promise> => { + validateInputs([connectorId, ZId]); + + try { + const connector = await prisma.connector.delete({ + where: { + id: connectorId, + }, + select: selectConnector, + }); + + return ok(connector as TConnector); + } catch (error) { + if (error instanceof Prisma.PrismaClientKnownRequestError) { + if (error.code === PrismaErrorType.RecordNotFound) { + return err({ + code: ConnectorError.CONNECTOR_NOT_FOUND, + message: "Connector not found", + }); + } + return err({ + code: ConnectorError.UNEXPECTED_ERROR, + message: error.message, + }); + } + return err({ + code: ConnectorError.UNEXPECTED_ERROR, + message: error instanceof Error ? error.message : "Unknown error", + }); + } +}; + +/** + * Create Formbricks element mappings for a connector + */ +export const createFormbricksMappings = async ( + connectorId: string, + mappings: TConnectorFormbricksMappingCreateInput[] +): Promise> => { + validateInputs([connectorId, ZId]); + mappings.forEach((mapping) => validateInputs([mapping, ZConnectorFormbricksMappingCreateInput])); + + try { + const createdMappings = await prisma.$transaction( + mappings.map((mapping) => + prisma.connectorFormbricksMapping.create({ + data: { + connectorId, + surveyId: mapping.surveyId, + elementId: mapping.elementId, + hubFieldType: mapping.hubFieldType, + customFieldLabel: mapping.customFieldLabel, + }, + }) + ) + ); + + return ok(createdMappings as TConnectorFormbricksMapping[]); + } catch (error) { + if (error instanceof Prisma.PrismaClientKnownRequestError) { + if (error.code === PrismaErrorType.UniqueConstraintViolation) { + return err({ + code: ConnectorError.DUPLICATE_MAPPING, + message: "A mapping for this survey element already exists", + }); + } + return err({ + code: ConnectorError.UNEXPECTED_ERROR, + message: error.message, + }); + } + return err({ + code: ConnectorError.UNEXPECTED_ERROR, + message: error instanceof Error ? error.message : "Unknown error", + }); + } +}; + +/** + * Sync Formbricks mappings - replaces all existing mappings with new ones + */ +export const syncFormbricksMappings = async ( + connectorId: string, + mappings: TConnectorFormbricksMappingCreateInput[] +): Promise> => { + validateInputs([connectorId, ZId]); + mappings.forEach((mapping) => validateInputs([mapping, ZConnectorFormbricksMappingCreateInput])); + + try { + // Delete all existing mappings and create new ones in a transaction + const createdMappings = await prisma.$transaction(async (tx) => { + // Delete existing mappings + await tx.connectorFormbricksMapping.deleteMany({ + where: { + connectorId, + }, + }); + + // Create new mappings + const newMappings = await Promise.all( + mappings.map((mapping) => + tx.connectorFormbricksMapping.create({ + data: { + connectorId, + surveyId: mapping.surveyId, + elementId: mapping.elementId, + hubFieldType: mapping.hubFieldType, + customFieldLabel: mapping.customFieldLabel, + }, + }) + ) + ); + + return newMappings; + }); + + return ok(createdMappings as TConnectorFormbricksMapping[]); + } catch (error) { + if (error instanceof Prisma.PrismaClientKnownRequestError) { + return err({ + code: ConnectorError.UNEXPECTED_ERROR, + message: error.message, + }); + } + return err({ + code: ConnectorError.UNEXPECTED_ERROR, + message: error instanceof Error ? error.message : "Unknown error", + }); + } +}; + +/** + * Delete a Formbricks mapping + */ +export const deleteFormbricksMapping = async ( + mappingId: string +): Promise> => { + validateInputs([mappingId, ZId]); + + try { + const mapping = await prisma.connectorFormbricksMapping.delete({ + where: { + id: mappingId, + }, + }); + + return ok(mapping as TConnectorFormbricksMapping); + } catch (error) { + if (error instanceof Prisma.PrismaClientKnownRequestError) { + if (error.code === PrismaErrorType.RecordNotFound) { + return err({ + code: ConnectorError.CONNECTOR_NOT_FOUND, + message: "Mapping not found", + }); + } + return err({ + code: ConnectorError.UNEXPECTED_ERROR, + message: error.message, + }); + } + return err({ + code: ConnectorError.UNEXPECTED_ERROR, + message: error instanceof Error ? error.message : "Unknown error", + }); + } +}; + +/** + * Create field mappings for a connector (webhook, csv, email, slack) + */ +export const createFieldMappings = async ( + connectorId: string, + mappings: TConnectorFieldMappingCreateInput[] +): Promise> => { + validateInputs([connectorId, ZId]); + mappings.forEach((mapping) => validateInputs([mapping, ZConnectorFieldMappingCreateInput])); + + try { + const createdMappings = await prisma.$transaction( + mappings.map((mapping) => + prisma.connectorFieldMapping.create({ + data: { + connectorId, + sourceFieldId: mapping.sourceFieldId, + targetFieldId: mapping.targetFieldId, + staticValue: mapping.staticValue, + }, + }) + ) + ); + + return ok(createdMappings as TConnectorFieldMapping[]); + } catch (error) { + if (error instanceof Prisma.PrismaClientKnownRequestError) { + if (error.code === PrismaErrorType.UniqueConstraintViolation) { + return err({ + code: ConnectorError.DUPLICATE_MAPPING, + message: "A mapping for this field already exists", + }); + } + return err({ + code: ConnectorError.UNEXPECTED_ERROR, + message: error.message, + }); + } + return err({ + code: ConnectorError.UNEXPECTED_ERROR, + message: error instanceof Error ? error.message : "Unknown error", + }); + } +}; + +/** + * Sync field mappings - replaces all existing field mappings with new ones + */ +export const syncFieldMappings = async ( + connectorId: string, + mappings: TConnectorFieldMappingCreateInput[] +): Promise> => { + validateInputs([connectorId, ZId]); + mappings.forEach((mapping) => validateInputs([mapping, ZConnectorFieldMappingCreateInput])); + + try { + // Delete all existing mappings and create new ones in a transaction + const createdMappings = await prisma.$transaction(async (tx) => { + // Delete existing mappings + await tx.connectorFieldMapping.deleteMany({ + where: { + connectorId, + }, + }); + + // Create new mappings + const newMappings = await Promise.all( + mappings.map((mapping) => + tx.connectorFieldMapping.create({ + data: { + connectorId, + sourceFieldId: mapping.sourceFieldId, + targetFieldId: mapping.targetFieldId, + staticValue: mapping.staticValue, + }, + }) + ) + ); + + return newMappings; + }); + + return ok(createdMappings as TConnectorFieldMapping[]); + } catch (error) { + if (error instanceof Prisma.PrismaClientKnownRequestError) { + return err({ + code: ConnectorError.UNEXPECTED_ERROR, + message: error.message, + }); + } + return err({ + code: ConnectorError.UNEXPECTED_ERROR, + message: error instanceof Error ? error.message : "Unknown error", + }); + } +}; + +/** + * Delete a field mapping + */ +export const deleteFieldMapping = async ( + mappingId: string +): Promise> => { + validateInputs([mappingId, ZId]); + + try { + const mapping = await prisma.connectorFieldMapping.delete({ + where: { + id: mappingId, + }, + }); + + return ok(mapping as TConnectorFieldMapping); + } catch (error) { + if (error instanceof Prisma.PrismaClientKnownRequestError) { + if (error.code === PrismaErrorType.RecordNotFound) { + return err({ + code: ConnectorError.CONNECTOR_NOT_FOUND, + message: "Mapping not found", + }); + } + return err({ + code: ConnectorError.UNEXPECTED_ERROR, + message: error.message, + }); + } + return err({ + code: ConnectorError.UNEXPECTED_ERROR, + message: error instanceof Error ? error.message : "Unknown error", + }); + } +}; + +/** + * Get Formbricks mappings for a connector + */ +export const getFormbricksMappings = reactCache( + async (connectorId: string): Promise => { + validateInputs([connectorId, ZId]); + + try { + const mappings = await prisma.connectorFormbricksMapping.findMany({ + where: { + connectorId, + }, + orderBy: { + createdAt: "asc", + }, + }); + + return mappings as TConnectorFormbricksMapping[]; + } catch (error) { + if (error instanceof Prisma.PrismaClientKnownRequestError) { + throw new DatabaseError(error.message); + } + throw error; + } + } +); + +/** + * Get field mappings for a connector + */ +export const getFieldMappings = reactCache(async (connectorId: string): Promise => { + validateInputs([connectorId, ZId]); + + try { + const mappings = await prisma.connectorFieldMapping.findMany({ + where: { + connectorId, + }, + orderBy: { + createdAt: "asc", + }, + }); + + return mappings as TConnectorFieldMapping[]; + } catch (error) { + if (error instanceof Prisma.PrismaClientKnownRequestError) { + throw new DatabaseError(error.message); + } + throw error; + } +}); diff --git a/apps/web/lib/connector/transform.ts b/apps/web/lib/connector/transform.ts new file mode 100644 index 0000000000..3d883533b0 --- /dev/null +++ b/apps/web/lib/connector/transform.ts @@ -0,0 +1,375 @@ +import "server-only"; +import { TConnectorFormbricksMapping, THubFieldType } from "@formbricks/types/connector"; +import { TResponse } from "@formbricks/types/responses"; +import { TSurvey } from "@formbricks/types/surveys/types"; +import { TCreateFeedbackRecordInput, THubFieldType as THubClientFieldType } from "./hub-client"; + +// Response data value types +type TResponseValue = string | number | string[] | Record | undefined; + +/** + * Get the headline of an element from a survey + */ +function getElementHeadline(survey: TSurvey, elementId: string): string { + // Try to find in blocks first + if (survey.blocks && survey.blocks.length > 0) { + for (const block of survey.blocks) { + if (block.elements) { + for (const element of block.elements) { + if (element.id === elementId) { + const headline = element.headline; + if (!headline) return "Untitled"; + if (typeof headline === "string") return headline; + if (typeof headline === "object" && headline.default) return headline.default; + return "Untitled"; + } + } + } + } + } + + // Fallback to legacy questions + if (survey.questions && Array.isArray(survey.questions)) { + for (const question of survey.questions as Array<{ + id: string; + headline?: string | { default?: string }; + }>) { + if (question.id === elementId) { + const headline = question.headline; + if (!headline) return "Untitled"; + if (typeof headline === "string") return headline; + if (typeof headline === "object" && headline.default) return headline.default; + return "Untitled"; + } + } + } + + return "Untitled"; +} + +/** + * Extract the value from a response for a specific element + */ +function extractResponseValue(responseData: TResponse["data"], elementId: string): TResponseValue { + if (!responseData || typeof responseData !== "object") return undefined; + return (responseData as Record)[elementId]; +} + +/** + * Convert a response value to the appropriate Hub value fields + */ +function convertValueToHubFields( + value: TResponseValue, + hubFieldType: THubFieldType +): Partial> { + if (value === undefined || value === null) { + return {}; + } + + switch (hubFieldType) { + case "text": + // Text values - could be string or array of strings + if (typeof value === "string") { + return { value_text: value }; + } + if (Array.isArray(value)) { + return { value_text: value.join(", ") }; + } + if (typeof value === "object") { + // Handle address-like objects + return { value_text: JSON.stringify(value) }; + } + return { value_text: String(value) }; + + case "number": + case "rating": + case "nps": + case "csat": + case "ces": + // Numeric values + if (typeof value === "number") { + return { value_number: value }; + } + if (typeof value === "string") { + const parsed = parseFloat(value); + if (!isNaN(parsed)) { + return { value_number: parsed }; + } + } + return {}; + + case "boolean": + // Boolean values + if (typeof value === "boolean") { + return { value_boolean: value }; + } + if (typeof value === "string") { + return { value_boolean: value.toLowerCase() === "true" || value === "1" }; + } + return {}; + + case "date": + // Date values + if (typeof value === "string") { + return { value_date: value }; + } + if (value instanceof Date) { + return { value_date: value.toISOString() }; + } + return {}; + + case "categorical": + // Categorical values (like multiple choice) + if (typeof value === "string") { + return { value_text: value }; + } + if (Array.isArray(value)) { + return { value_text: value.join(", ") }; + } + return { value_text: String(value) }; + + default: + // Default to text + if (typeof value === "string") { + return { value_text: value }; + } + return { value_text: String(value) }; + } +} + +/** + * Transform a Formbricks survey response into Hub FeedbackRecord payloads + * + * @param response - The Formbricks response + * @param survey - The survey the response belongs to + * @param mappings - The connector mappings for this survey + * @param tenantId - Optional tenant ID (usually environment or organization ID) + * @returns Array of FeedbackRecord payloads to send to the Hub + */ +export function transformResponseToFeedbackRecords( + response: TResponse, + survey: TSurvey, + mappings: TConnectorFormbricksMapping[], + tenantId?: string +): TCreateFeedbackRecordInput[] { + const feedbackRecords: TCreateFeedbackRecordInput[] = []; + + // Get response data + const responseData = response.data; + if (!responseData) { + return feedbackRecords; + } + + // Filter mappings to only those for this survey + const surveyMappings = mappings.filter((m) => m.surveyId === survey.id); + + // For each mapped element, create a FeedbackRecord + for (const mapping of surveyMappings) { + const value = extractResponseValue(responseData, mapping.elementId); + + // Skip if no value for this element + if (value === undefined || value === null || value === "") { + continue; + } + + // Get element headline (or use custom field label) + const fieldLabel = mapping.customFieldLabel || getElementHeadline(survey, mapping.elementId); + + // Convert value to appropriate Hub fields + const valueFields = convertValueToHubFields(value, mapping.hubFieldType as THubFieldType); + + // Create the FeedbackRecord payload + const feedbackRecord: TCreateFeedbackRecordInput = { + collected_at: response.createdAt.toISOString(), + source_type: "formbricks", + field_id: mapping.elementId, + field_type: mapping.hubFieldType as THubClientFieldType, + source_id: survey.id, + source_name: survey.name, + field_label: fieldLabel, + response_id: response.id, + language: response.language || undefined, + ...valueFields, + }; + + // Add tenant ID if provided + if (tenantId) { + feedbackRecord.tenant_id = tenantId; + } + + // Add user identifier if available + if (response.contactId) { + feedbackRecord.user_identifier = response.contactId; + } + + feedbackRecords.push(feedbackRecord); + } + + return feedbackRecords; +} + +/** + * Transform a webhook payload to a FeedbackRecord using field mappings + * + * @param payload - The incoming webhook payload + * @param mappings - The field mappings for this connector + * @returns FeedbackRecord payload to send to the Hub + */ +export function transformWebhookPayloadToFeedbackRecord( + payload: Record, + mappings: Array<{ + sourceFieldId: string; + targetFieldId: string; + staticValue?: string | null; + }> +): TCreateFeedbackRecordInput { + const feedbackRecord: Record = {}; + + for (const mapping of mappings) { + let value: unknown; + + if (mapping.staticValue) { + // Use static value + value = mapping.staticValue; + + // Handle special static values + if (value === "$now") { + value = new Date().toISOString(); + } + } else { + // Get value from payload using dot notation path + value = getNestedValue(payload, mapping.sourceFieldId); + } + + if (value !== undefined && value !== null) { + feedbackRecord[mapping.targetFieldId] = value; + } + } + + // Ensure required fields have defaults + if (!feedbackRecord.source_type) { + feedbackRecord.source_type = "webhook"; + } + if (!feedbackRecord.collected_at) { + feedbackRecord.collected_at = new Date().toISOString(); + } + if (!feedbackRecord.field_type) { + feedbackRecord.field_type = "text"; + } + + return feedbackRecord as TCreateFeedbackRecordInput; +} + +/** + * Transform a CSV row to a FeedbackRecord using field mappings + * + * @param row - The CSV row as an object (column name -> value) + * @param mappings - The field mappings for this connector + * @returns FeedbackRecord payload to send to the Hub + */ +export function transformCSVRowToFeedbackRecord( + row: Record, + mappings: Array<{ + sourceFieldId: string; + targetFieldId: string; + staticValue?: string | null; + }> +): TCreateFeedbackRecordInput { + const feedbackRecord: Record = {}; + + for (const mapping of mappings) { + let value: unknown; + + if (mapping.staticValue) { + // Use static value + value = mapping.staticValue; + + // Handle special static values + if (value === "$now") { + value = new Date().toISOString(); + } + } else { + // Get value from CSV row + value = row[mapping.sourceFieldId]; + } + + if (value !== undefined && value !== null && value !== "") { + // Try to convert to appropriate type based on target field + if (mapping.targetFieldId === "value_number") { + const parsed = parseFloat(String(value)); + if (!isNaN(parsed)) { + feedbackRecord[mapping.targetFieldId] = parsed; + continue; + } + } + if (mapping.targetFieldId === "value_boolean") { + feedbackRecord[mapping.targetFieldId] = + String(value).toLowerCase() === "true" || String(value) === "1"; + continue; + } + + feedbackRecord[mapping.targetFieldId] = value; + } + } + + // Ensure required fields have defaults + if (!feedbackRecord.source_type) { + feedbackRecord.source_type = "csv"; + } + if (!feedbackRecord.collected_at) { + feedbackRecord.collected_at = new Date().toISOString(); + } + if (!feedbackRecord.field_type) { + feedbackRecord.field_type = "text"; + } + + return feedbackRecord as TCreateFeedbackRecordInput; +} + +/** + * Helper to get a nested value from an object using dot notation and array brackets + * e.g., getNestedValue({user: {id: "123"}}, "user.id") => "123" + * e.g., getNestedValue({items: [{name: "a"}]}, "items[0].name") => "a" + */ +function getNestedValue(obj: Record, path: string): unknown { + let current: unknown = obj; + + // Split by dots, but we need to handle array notation within each segment + const segments = path.split("."); + + for (const segment of segments) { + if (current === null || current === undefined) { + return undefined; + } + + // Check if segment contains array notation like "answers[0]" or just "[0]" + const arrayMatch = segment.match(/^([^\[]*)\[(\d+)\]$/); + + if (arrayMatch) { + const [, propertyName, indexStr] = arrayMatch; + const index = parseInt(indexStr, 10); + + // If there's a property name before the bracket, access it first + if (propertyName) { + if (typeof current !== "object") { + return undefined; + } + current = (current as Record)[propertyName]; + } + + // Now access the array index + if (!Array.isArray(current)) { + return undefined; + } + current = current[index]; + } else { + // Regular property access + if (typeof current !== "object") { + return undefined; + } + current = (current as Record)[segment]; + } + } + + return current; +} diff --git a/apps/web/lib/connector/webhook-listener-store.ts b/apps/web/lib/connector/webhook-listener-store.ts new file mode 100644 index 0000000000..7f56e208d9 --- /dev/null +++ b/apps/web/lib/connector/webhook-listener-store.ts @@ -0,0 +1,155 @@ +import "server-only"; +import { nanoid } from "nanoid"; + +// Session TTL in milliseconds (5 minutes) +const SESSION_TTL_MS = 5 * 60 * 1000; + +// Maximum payload size in bytes (100KB) +const MAX_PAYLOAD_SIZE = 100 * 1024; + +// Cleanup interval in milliseconds (1 minute) +const CLEANUP_INTERVAL_MS = 60 * 1000; + +interface WebhookSession { + payload: Record; + receivedAt: number; +} + +// In-memory store for webhook payloads +const sessionStore = new Map(); + +// Track if cleanup interval is running +let cleanupIntervalId: NodeJS.Timeout | null = null; + +/** + * Generate a unique session ID for webhook listening + */ +export function generateSessionId(): string { + return nanoid(21); +} + +/** + * Store a received webhook payload for a session + * @returns true if stored successfully, false if payload is too large or session doesn't exist + */ +export function storePayload(sessionId: string, payload: Record): boolean { + // Check payload size + const payloadStr = JSON.stringify(payload); + if (payloadStr.length > MAX_PAYLOAD_SIZE) { + return false; + } + + sessionStore.set(sessionId, { + payload, + receivedAt: Date.now(), + }); + + return true; +} + +/** + * Get the received payload for a session + * @param clear - If true, removes the payload after retrieval (default: true) + * @returns The payload if found, null otherwise + */ +export function getPayload(sessionId: string, clear: boolean = true): Record | null { + const session = sessionStore.get(sessionId); + + if (!session) { + return null; + } + + // Check if session has expired + if (Date.now() - session.receivedAt > SESSION_TTL_MS) { + sessionStore.delete(sessionId); + return null; + } + + const { payload } = session; + + if (clear) { + sessionStore.delete(sessionId); + } + + return payload; +} + +/** + * Check if a session exists (for validation) + */ +export function sessionExists(sessionId: string): boolean { + return sessionStore.has(sessionId); +} + +/** + * Create a new listening session + * @returns The session ID + */ +export function createSession(): string { + const sessionId = generateSessionId(); + // Initialize with empty marker to indicate session is active + // This doesn't store a payload yet, just marks the session as valid + return sessionId; +} + +/** + * Delete a session + */ +export function deleteSession(sessionId: string): void { + sessionStore.delete(sessionId); +} + +/** + * Clean up expired sessions + */ +export function cleanupExpiredSessions(): number { + const now = Date.now(); + let cleanedCount = 0; + + for (const [sessionId, session] of sessionStore.entries()) { + if (now - session.receivedAt > SESSION_TTL_MS) { + sessionStore.delete(sessionId); + cleanedCount++; + } + } + + return cleanedCount; +} + +/** + * Start the automatic cleanup interval + */ +export function startCleanupInterval(): void { + if (cleanupIntervalId) { + return; // Already running + } + + cleanupIntervalId = setInterval(() => { + cleanupExpiredSessions(); + }, CLEANUP_INTERVAL_MS); + + // Prevent the interval from keeping the process alive + if (cleanupIntervalId.unref) { + cleanupIntervalId.unref(); + } +} + +/** + * Stop the automatic cleanup interval + */ +export function stopCleanupInterval(): void { + if (cleanupIntervalId) { + clearInterval(cleanupIntervalId); + cleanupIntervalId = null; + } +} + +/** + * Get the current number of active sessions (for debugging/monitoring) + */ +export function getSessionCount(): number { + return sessionStore.size; +} + +// Start cleanup on module load +startCleanupInterval(); diff --git a/apps/web/lib/utils/helper.ts b/apps/web/lib/utils/helper.ts index bc4da7ece0..d65d512b02 100644 --- a/apps/web/lib/utils/helper.ts +++ b/apps/web/lib/utils/helper.ts @@ -2,6 +2,7 @@ import { ResourceNotFoundError } from "@formbricks/types/errors"; import { getActionClass, getApiKey, + getConnector, getContact, getEnvironment, getIntegration, @@ -329,3 +330,31 @@ export const isStringMatch = (query: string, value: string): boolean => { return valueModified.includes(queryModified); }; + +// Connector helpers +export const getOrganizationIdFromConnectorId = async (connectorId: string) => { + const connector = await getConnector(connectorId); + if (!connector) { + throw new ResourceNotFoundError("connector", connectorId); + } + + return await getOrganizationIdFromEnvironmentId(connector.environmentId); +}; + +export const getProjectIdFromConnectorId = async (connectorId: string) => { + const connector = await getConnector(connectorId); + if (!connector) { + throw new ResourceNotFoundError("connector", connectorId); + } + + return await getProjectIdFromEnvironmentId(connector.environmentId); +}; + +export const getEnvironmentIdFromConnectorId = async (connectorId: string) => { + const connector = await getConnector(connectorId); + if (!connector) { + throw new ResourceNotFoundError("connector", connectorId); + } + + return connector.environmentId; +}; diff --git a/apps/web/lib/utils/services.ts b/apps/web/lib/utils/services.ts index 42faad4a24..5b67f81dce 100644 --- a/apps/web/lib/utils/services.ts +++ b/apps/web/lib/utils/services.ts @@ -329,3 +329,25 @@ export const getSegment = reactCache(async (segmentId: string): Promise<{ enviro throw error; } }); + +export const getConnector = reactCache( + async (connectorId: string): Promise<{ environmentId: string } | null> => { + validateInputs([connectorId, ZId]); + try { + const connector = await prisma.connector.findUnique({ + where: { + id: connectorId, + }, + select: { environmentId: true }, + }); + + return connector; + } catch (error) { + if (error instanceof Prisma.PrismaClientKnownRequestError) { + throw new DatabaseError(error.message); + } + + throw error; + } + } +); diff --git a/packages/database/migration/20260203135615_added_connector_model/migration.sql b/packages/database/migration/20260203135615_added_connector_model/migration.sql new file mode 100644 index 0000000000..db64aae71d --- /dev/null +++ b/packages/database/migration/20260203135615_added_connector_model/migration.sql @@ -0,0 +1,81 @@ +-- CreateEnum +CREATE TYPE "public"."ConnectorType" AS ENUM ('formbricks', 'webhook', 'csv', 'email', 'slack'); + +-- CreateEnum +CREATE TYPE "public"."ConnectorStatus" AS ENUM ('active', 'paused', 'error'); + +-- CreateTable +CREATE TABLE "public"."Connector" ( + "id" TEXT NOT NULL, + "created_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "updated_at" TIMESTAMP(3) NOT NULL, + "name" TEXT NOT NULL, + "type" "public"."ConnectorType" NOT NULL, + "status" "public"."ConnectorStatus" NOT NULL DEFAULT 'active', + "environmentId" TEXT NOT NULL, + "config" JSONB, + "last_sync_at" TIMESTAMP(3), + "error_message" TEXT, + + CONSTRAINT "Connector_pkey" PRIMARY KEY ("id") +); + +-- CreateTable +CREATE TABLE "public"."ConnectorFormbricksMapping" ( + "id" TEXT NOT NULL, + "created_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "connectorId" TEXT NOT NULL, + "surveyId" TEXT NOT NULL, + "elementId" TEXT NOT NULL, + "hubFieldType" TEXT NOT NULL, + "custom_field_label" TEXT, + + CONSTRAINT "ConnectorFormbricksMapping_pkey" PRIMARY KEY ("id") +); + +-- CreateTable +CREATE TABLE "public"."ConnectorFieldMapping" ( + "id" TEXT NOT NULL, + "created_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "connectorId" TEXT NOT NULL, + "source_field_id" TEXT NOT NULL, + "target_field_id" TEXT NOT NULL, + "static_value" TEXT, + + CONSTRAINT "ConnectorFieldMapping_pkey" PRIMARY KEY ("id") +); + +-- CreateIndex +CREATE INDEX "Connector_environmentId_idx" ON "public"."Connector"("environmentId"); + +-- CreateIndex +CREATE INDEX "Connector_type_idx" ON "public"."Connector"("type"); + +-- CreateIndex +CREATE INDEX "ConnectorFormbricksMapping_connectorId_idx" ON "public"."ConnectorFormbricksMapping"("connectorId"); + +-- CreateIndex +CREATE INDEX "ConnectorFormbricksMapping_surveyId_idx" ON "public"."ConnectorFormbricksMapping"("surveyId"); + +-- CreateIndex +CREATE UNIQUE INDEX "ConnectorFormbricksMapping_connectorId_surveyId_elementId_key" ON "public"."ConnectorFormbricksMapping"("connectorId", "surveyId", "elementId"); + +-- CreateIndex +CREATE INDEX "ConnectorFieldMapping_connectorId_idx" ON "public"."ConnectorFieldMapping"("connectorId"); + +-- CreateIndex +CREATE UNIQUE INDEX "ConnectorFieldMapping_connectorId_source_field_id_target_fi_key" ON "public"."ConnectorFieldMapping"("connectorId", "source_field_id", "target_field_id"); + +CREATE UNIQUE INDEX "Connector_environmentId_name_key" ON "public"."Connector"("environmentId", "name"); + +-- AddForeignKey +ALTER TABLE "public"."Connector" ADD CONSTRAINT "Connector_environmentId_fkey" FOREIGN KEY ("environmentId") REFERENCES "public"."Environment"("id") ON DELETE CASCADE ON UPDATE CASCADE; + +-- AddForeignKey +ALTER TABLE "public"."ConnectorFormbricksMapping" ADD CONSTRAINT "ConnectorFormbricksMapping_connectorId_fkey" FOREIGN KEY ("connectorId") REFERENCES "public"."Connector"("id") ON DELETE CASCADE ON UPDATE CASCADE; + +-- AddForeignKey +ALTER TABLE "public"."ConnectorFormbricksMapping" ADD CONSTRAINT "ConnectorFormbricksMapping_surveyId_fkey" FOREIGN KEY ("surveyId") REFERENCES "public"."Survey"("id") ON DELETE CASCADE ON UPDATE CASCADE; + +-- AddForeignKey +ALTER TABLE "public"."ConnectorFieldMapping" ADD CONSTRAINT "ConnectorFieldMapping_connectorId_fkey" FOREIGN KEY ("connectorId") REFERENCES "public"."Connector"("id") ON DELETE CASCADE ON UPDATE CASCADE; diff --git a/packages/database/schema.prisma b/packages/database/schema.prisma index 69699d1cfb..a70e2bb8e7 100644 --- a/packages/database/schema.prisma +++ b/packages/database/schema.prisma @@ -378,19 +378,20 @@ model Survey { /// [SurveySingleUse] singleUse Json? @default("{\"enabled\": false, \"isEncrypted\": true}") - isVerifyEmailEnabled Boolean @default(false) - isSingleResponsePerEmailEnabled Boolean @default(false) - isBackButtonHidden Boolean @default(false) - isCaptureIpEnabled Boolean @default(false) + isVerifyEmailEnabled Boolean @default(false) + isSingleResponsePerEmailEnabled Boolean @default(false) + isBackButtonHidden Boolean @default(false) + isCaptureIpEnabled Boolean @default(false) pin String? displayPercentage Decimal? languages SurveyLanguage[] showLanguageSwitch Boolean? followUps SurveyFollowUp[] /// [SurveyRecaptcha] - recaptcha Json? @default("{\"enabled\": false, \"threshold\":0.1}") + recaptcha Json? @default("{\"enabled\": false, \"threshold\":0.1}") /// [SurveyLinkMetadata] - metadata Json @default("{}") + metadata Json @default("{}") + connectorMappings ConnectorFormbricksMapping[] slug String? @unique @@ -581,6 +582,7 @@ model Environment { segments Segment[] integration Integration[] ApiKeyEnvironment ApiKeyEnvironment[] + connectors Connector[] @@index([projectId]) } @@ -984,3 +986,95 @@ model ProjectTeam { @@id([projectId, teamId]) @@index([teamId]) } + +enum ConnectorType { + formbricks + webhook + csv + email + slack +} + +enum ConnectorStatus { + active + paused + error +} + +/// Base connector for all integration types. +/// Connects external data sources to the Hub for feedback record creation. +/// +/// @property id - Unique identifier for the connector +/// @property name - Display name for the connector +/// @property type - Type of connector (formbricks, webhook, csv, email, slack) +/// @property status - Current state of the connector (active, paused, error) +/// @property environment - The environment this connector belongs to +/// @property config - Type-specific configuration (e.g., webhook secret, S3 config) +/// @property formbricksMappings - Element mappings for Formbricks connectors +/// @property fieldMappings - Field mappings for other connector types +model Connector { + id String @id @default(cuid()) + createdAt DateTime @default(now()) @map(name: "created_at") + updatedAt DateTime @updatedAt @map(name: "updated_at") + name String + type ConnectorType + status ConnectorStatus @default(active) + environmentId String + environment Environment @relation(fields: [environmentId], references: [id], onDelete: Cascade) + /// [ConnectorConfig] + config Json? + formbricksMappings ConnectorFormbricksMapping[] + fieldMappings ConnectorFieldMapping[] + lastSyncAt DateTime? @map(name: "last_sync_at") + errorMessage String? @map(name: "error_message") + + @@unique([environmentId, name]) + @@index([environmentId]) + @@index([type]) +} + +/// Maps survey elements to Hub FeedbackRecords for Formbricks connectors. +/// Each row represents one element that will create FeedbackRecords when answered. +/// +/// @property id - Unique identifier for the mapping +/// @property connector - The parent connector +/// @property survey - The survey containing the element +/// @property elementId - The element ID within the survey (from blocks[].elements[].id) +/// @property hubFieldType - The field_type to use in Hub (text, nps, rating, etc.) +/// @property customFieldLabel - Optional override for the element headline as field_label in Hub +model ConnectorFormbricksMapping { + id String @id @default(cuid()) + createdAt DateTime @default(now()) @map(name: "created_at") + connectorId String + connector Connector @relation(fields: [connectorId], references: [id], onDelete: Cascade) + surveyId String + survey Survey @relation(fields: [surveyId], references: [id], onDelete: Cascade) + elementId String + hubFieldType String + customFieldLabel String? @map(name: "custom_field_label") + + @@unique([connectorId, surveyId, elementId]) + @@index([connectorId]) + @@index([surveyId]) +} + +/// Generic field mapping for Webhook, CSV, Email, Slack connectors. +/// Maps source fields to Hub FeedbackRecord fields. +/// +/// @property id - Unique identifier for the mapping +/// @property connector - The parent connector +/// @property sourceFieldId - Field path for webhook (e.g., "user.id"), column name for CSV +/// @property targetFieldId - Hub field (collected_at, field_id, value_text, etc.) +/// @property staticValue - If set, use this value instead of reading from sourceFieldId +model ConnectorFieldMapping { + id String @id @default(cuid()) + createdAt DateTime @default(now()) @map(name: "created_at") + connectorId String + connector Connector @relation(fields: [connectorId], references: [id], onDelete: Cascade) + sourceFieldId String @map(name: "source_field_id") + targetFieldId String @map(name: "target_field_id") + staticValue String? @map(name: "static_value") + + @@unique([connectorId, sourceFieldId, targetFieldId]) + @@index([connectorId]) +} diff --git a/packages/types/connector.ts b/packages/types/connector.ts new file mode 100644 index 0000000000..7fa8a30573 --- /dev/null +++ b/packages/types/connector.ts @@ -0,0 +1,235 @@ +import { z } from "zod"; + +// Connector type enum +export const ZConnectorType = z.enum(["formbricks", "webhook", "csv", "email", "slack"]); +export type TConnectorType = z.infer; + +// Connector status enum +export const ZConnectorStatus = z.enum(["active", "paused", "error"]); +export type TConnectorStatus = z.infer; + +// Hub field types (from Hub OpenAPI spec) +export const ZHubFieldType = z.enum([ + "text", + "categorical", + "nps", + "csat", + "ces", + "rating", + "number", + "boolean", + "date", +]); +export type THubFieldType = z.infer; + +// Hub target fields for mapping +export const ZHubTargetField = z.enum([ + "collected_at", + "source_type", + "field_id", + "field_type", + "tenant_id", + "response_id", + "source_id", + "source_name", + "field_label", + "value_text", + "value_number", + "value_boolean", + "value_date", + "metadata", + "language", + "user_identifier", +]); +export type THubTargetField = z.infer; + +// Base connector schema +export const ZConnector = z.object({ + id: z.string().cuid2(), + createdAt: z.date(), + updatedAt: z.date(), + name: z.string().min(1), + type: ZConnectorType, + status: ZConnectorStatus, + environmentId: z.string().cuid2(), + config: z.record(z.unknown()).nullable(), + lastSyncAt: z.date().nullable(), + errorMessage: z.string().nullable(), +}); +export type TConnector = z.infer; + +// Formbricks element mapping +export const ZConnectorFormbricksMapping = z.object({ + id: z.string().cuid2(), + createdAt: z.date(), + connectorId: z.string().cuid2(), + surveyId: z.string().cuid2(), + elementId: z.string(), + hubFieldType: ZHubFieldType, + customFieldLabel: z.string().nullable(), +}); +export type TConnectorFormbricksMapping = z.infer; + +// Generic field mapping (for webhook, csv, email, slack) +export const ZConnectorFieldMapping = z.object({ + id: z.string().cuid2(), + createdAt: z.date(), + connectorId: z.string().cuid2(), + sourceFieldId: z.string(), + targetFieldId: ZHubTargetField, + staticValue: z.string().nullable(), +}); +export type TConnectorFieldMapping = z.infer; + +// Formbricks connector with mappings +export const ZFormbricksConnector = ZConnector.extend({ + type: z.literal("formbricks"), + formbricksMappings: z.array(ZConnectorFormbricksMapping), +}); +export type TFormbricksConnector = z.infer; + +// Webhook connector config +export const ZWebhookConnectorConfig = z.object({ + webhookSecret: z.string().optional(), + payloadSchema: z.record(z.unknown()).optional(), +}); +export type TWebhookConnectorConfig = z.infer; + +// Webhook connector with config and mappings +export const ZWebhookConnector = ZConnector.extend({ + type: z.literal("webhook"), + config: ZWebhookConnectorConfig.nullable(), + fieldMappings: z.array(ZConnectorFieldMapping), +}); +export type TWebhookConnector = z.infer; + +// CSV connector config +export const ZCSVConnectorConfig = z.object({ + importMode: z.enum(["manual", "s3"]), + s3Config: z + .object({ + bucketName: z.string(), + region: z.string(), + prefix: z.string().optional(), + }) + .optional(), +}); +export type TCSVConnectorConfig = z.infer; + +// CSV connector with config and mappings +export const ZCSVConnector = ZConnector.extend({ + type: z.literal("csv"), + config: ZCSVConnectorConfig.nullable(), + fieldMappings: z.array(ZConnectorFieldMapping), +}); +export type TCSVConnector = z.infer; + +// Email connector config +export const ZEmailConnectorConfig = z.object({ + parseMode: z.string().optional(), +}); +export type TEmailConnectorConfig = z.infer; + +// Email connector with config and mappings +export const ZEmailConnector = ZConnector.extend({ + type: z.literal("email"), + config: ZEmailConnectorConfig.nullable(), + fieldMappings: z.array(ZConnectorFieldMapping), +}); +export type TEmailConnector = z.infer; + +// Slack connector config +export const ZSlackConnectorConfig = z.object({ + channelId: z.string().optional(), +}); +export type TSlackConnectorConfig = z.infer; + +// Slack connector with config and mappings +export const ZSlackConnector = ZConnector.extend({ + type: z.literal("slack"), + config: ZSlackConnectorConfig.nullable(), + fieldMappings: z.array(ZConnectorFieldMapping), +}); +export type TSlackConnector = z.infer; + +// Union type for any connector with its mappings +export type TConnectorWithMappings = + | TFormbricksConnector + | TWebhookConnector + | TCSVConnector + | TEmailConnector + | TSlackConnector; + +// UI helper types - Formbricks mapping with joined survey/element data +export const ZConnectorFormbricksMappingWithDetails = ZConnectorFormbricksMapping.extend({ + survey: z.object({ + id: z.string(), + name: z.string(), + status: z.string(), + }), + element: z.object({ + id: z.string(), + headline: z.string(), + type: z.string(), + }), +}); +export type TConnectorFormbricksMappingWithDetails = z.infer; + +// Create input schemas +export const ZConnectorCreateInput = z.object({ + name: z.string().min(1), + type: ZConnectorType, + config: z.record(z.unknown()).optional(), +}); +export type TConnectorCreateInput = z.infer; + +// Create Formbricks mapping input +export const ZConnectorFormbricksMappingCreateInput = z.object({ + surveyId: z.string().cuid2(), + elementId: z.string(), + hubFieldType: ZHubFieldType, + customFieldLabel: z.string().optional(), +}); +export type TConnectorFormbricksMappingCreateInput = z.infer; + +// Create field mapping input +export const ZConnectorFieldMappingCreateInput = z.object({ + sourceFieldId: z.string(), + targetFieldId: ZHubTargetField, + staticValue: z.string().optional(), +}); +export type TConnectorFieldMappingCreateInput = z.infer; + +// Update connector input +export const ZConnectorUpdateInput = z.object({ + name: z.string().min(1).optional(), + status: ZConnectorStatus.optional(), + config: z.record(z.unknown()).optional(), + errorMessage: z.string().nullable().optional(), + lastSyncAt: z.date().nullable().optional(), +}); +export type TConnectorUpdateInput = z.infer; + +// Element type to Hub field type mapping helper +export const ELEMENT_TYPE_TO_HUB_FIELD_TYPE: Record = { + openText: "text", + nps: "nps", + rating: "rating", + multipleChoiceSingle: "categorical", + multipleChoiceMulti: "categorical", + date: "date", + consent: "boolean", + matrix: "categorical", + ranking: "categorical", + pictureSelection: "categorical", + contactInfo: "text", + address: "text", + fileUpload: "text", + cal: "text", + cta: "boolean", +}; + +// Helper function to get Hub field type from element type +export const getHubFieldTypeFromElementType = (elementType: string): THubFieldType => { + return ELEMENT_TYPE_TO_HUB_FIELD_TYPE[elementType] || "text"; +};