Files
matt d6f8be2c0f Feat: OLAP Table for CEL Eval Failures (#2012)
* feat: add table, wire up partitioning

* feat: wire failures into the OLAP db from rabbit

* feat: bubble failures up to controller

* fix: naming

* fix: hack around enum type

* fix: typo

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* fix: typos

* fix: migration name

* feat: log debug failure

* feat: pub message from debug endpoint to log failure

* fix: error handling

* fix: use ingestor

* fix: olap suffix

* fix: pass source through

* fix: dont log ingest failure

* fix: rm debug as enum opt

* chore: gen

* Feat: Webhooks (#1978)

* feat: migration + go gen

* feat: non unique source name

* feat: api types

* fix: rm cruft

* feat: initial api for webhooks

* feat: handle encryption of incoming keys

* fix: nil pointer errors

* fix: import

* feat: add endpoint for incoming webhooks

* fix: naming

* feat: start wiring up basic auth

* feat: wire up cel event parsing

* feat: implement authentication

* fix: hack for plain text content

* feat: add source to enum

* feat: add source name enum

* feat: db source name enum fix

* fix: use source name enums

* feat: nest sources

* feat: first pass at stripe

* fix: clean up source name passing

* fix: use unique name for webhook

* feat: populator test

* fix: null values

* fix: ordering

* fix: rm unnecessary index

* fix: validation

* feat: validation on create

* fix: lint

* fix: naming

* feat: wire triggering webhook name through to events table

* feat: cleanup + python gen + e2e test for basic auth

* feat: query to insert webhook validation errors

* refactor: auth handler

* fix: naming

* refactor: validation errors, part II

* feat: wire up writes through olap

* fix: linting, fallthrough case

* fix: validation

* feat: tests for failure cases for basic auth

* feat: expand tests

* fix: correctly return 404 out of task getter

* chore: generated stuff

* fix: rm cruft

* fix: longer sleep

* debug: print name + events to logs

* feat: limit to N

* feat: add limit env var

* debug: ci test

* fix: apply namespaces to keys

* fix: namespacing, part ii

* fix: sdk config

* fix: handle prefixing

* feat: handle partitioning logic

* chore: gen

* feat: add webhook limit

* feat: wire up limits

* fix: gen

* fix: reverse order of generic fallthrough

* fix: comment for potential unexpected behavior

* fix: add check constraints, improve error handling

* chore: gen

* chore: gen

* fix: improve naming

* feat: scaffold webhooks page

* feat: sidebar

* feat: first pass at page

* feat: improve feedback on UI

* feat: initial work on create modal

* feat: change default to basic

* fix: openapi spec discriminated union

* fix: go side

* feat: start wiring up placeholders for stripe and github

* feat: pre-populated fields for Stripe + Github

* feat: add name section

* feat: copy improvements, show URL

* feat: UI cleanup

* fix: check if tenant populator errors

* feat: add comments

* chore: gen again

* fix: default name

* fix: styling

* fix: improve stripe header processing

* feat: docs, part 1

* fix: lint

* fix: migration order

* feat: implement rate limit per-webhook

* feat: comment

* feat: clean up docs

* chore: gen

* fix: migration versions

* fix: olap naming

* fix: partitions

* chore: gen

* feat: store webhook cel eval failures properly

* fix: pk order

* fix: auth tweaks, move fetches out of populator

* fix: pgtype.Text instead of string pointer

* chore: gen

---------

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
2025-07-30 13:27:38 -04:00

76 lines
1.6 KiB
Go

package ingestors
import (
"encoding/json"
"fmt"
"io"
"github.com/labstack/echo/v4"
"github.com/hatchet-dev/hatchet/api/v1/server/oas/gen"
"github.com/hatchet-dev/hatchet/internal/integrations/ingestors/sns"
)
func (i *IngestorsService) SnsUpdate(ctx echo.Context, req gen.SnsUpdateRequestObject) (gen.SnsUpdateResponseObject, error) {
body, err := io.ReadAll(ctx.Request().Body)
if err != nil {
return nil, err
}
payload := &sns.Payload{}
err = json.Unmarshal(body, payload)
if err != nil {
return nil, err
}
if err := payload.VerifyPayload(); err != nil {
return nil, err
}
tenantId := req.Tenant.String()
// verify that the tenant and the topic ARN are set in the database
snsInt, err := i.config.APIRepository.SNS().GetSNSIntegration(ctx.Request().Context(), tenantId, payload.TopicArn)
if err != nil {
return nil, err
}
if snsInt == nil {
return nil, fmt.Errorf("SNS integration not found for tenant %s and topic ARN %s", tenantId, payload.TopicArn)
}
tenant, err := i.config.APIRepository.Tenant().GetTenantByID(ctx.Request().Context(), tenantId)
if err != nil {
return nil, err
}
switch payload.Type {
case "SubscriptionConfirmation":
_, err := payload.Subscribe()
if err != nil {
return nil, err
}
case "UnsubscribeConfirmation":
_, err := payload.Unsubscribe()
if err != nil {
return nil, err
}
default:
_, err := i.config.Ingestor.IngestEvent(ctx.Request().Context(), tenant, req.Event, body, nil, nil, nil, nil)
if err != nil {
return nil, err
}
}
return gen.SnsUpdate200Response{}, nil
}