Files
hatchet/api/v1/server/run/run.go
matt d6f8be2c0f Feat: OLAP Table for CEL Eval Failures (#2012)
* feat: add table, wire up partitioning

* feat: wire failures into the OLAP db from rabbit

* feat: bubble failures up to controller

* fix: naming

* fix: hack around enum type

* fix: typo

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* fix: typos

* fix: migration name

* feat: log debug failure

* feat: pub message from debug endpoint to log failure

* fix: error handling

* fix: use ingestor

* fix: olap suffix

* fix: pass source through

* fix: dont log ingest failure

* fix: rm debug as enum opt

* chore: gen

* Feat: Webhooks (#1978)

* feat: migration + go gen

* feat: non unique source name

* feat: api types

* fix: rm cruft

* feat: initial api for webhooks

* feat: handle encryption of incoming keys

* fix: nil pointer errors

* fix: import

* feat: add endpoint for incoming webhooks

* fix: naming

* feat: start wiring up basic auth

* feat: wire up cel event parsing

* feat: implement authentication

* fix: hack for plain text content

* feat: add source to enum

* feat: add source name enum

* feat: db source name enum fix

* fix: use source name enums

* feat: nest sources

* feat: first pass at stripe

* fix: clean up source name passing

* fix: use unique name for webhook

* feat: populator test

* fix: null values

* fix: ordering

* fix: rm unnecessary index

* fix: validation

* feat: validation on create

* fix: lint

* fix: naming

* feat: wire triggering webhook name through to events table

* feat: cleanup + python gen + e2e test for basic auth

* feat: query to insert webhook validation errors

* refactor: auth handler

* fix: naming

* refactor: validation errors, part II

* feat: wire up writes through olap

* fix: linting, fallthrough case

* fix: validation

* feat: tests for failure cases for basic auth

* feat: expand tests

* fix: correctly return 404 out of task getter

* chore: generated stuff

* fix: rm cruft

* fix: longer sleep

* debug: print name + events to logs

* feat: limit to N

* feat: add limit env var

* debug: ci test

* fix: apply namespaces to keys

* fix: namespacing, part ii

* fix: sdk config

* fix: handle prefixing

* feat: handle partitioning logic

* chore: gen

* feat: add webhook limit

* feat: wire up limits

* fix: gen

* fix: reverse order of generic fallthrough

* fix: comment for potential unexpected behavior

* fix: add check constraints, improve error handling

* chore: gen

* chore: gen

* fix: improve naming

* feat: scaffold webhooks page

* feat: sidebar

* feat: first pass at page

* feat: improve feedback on UI

* feat: initial work on create modal

* feat: change default to basic

* fix: openapi spec discriminated union

* fix: go side

* feat: start wiring up placeholders for stripe and github

* feat: pre-populated fields for Stripe + Github

* feat: add name section

* feat: copy improvements, show URL

* feat: UI cleanup

* fix: check if tenant populator errors

* feat: add comments

* chore: gen again

* fix: default name

* fix: styling

* fix: improve stripe header processing

* feat: docs, part 1

* fix: lint

* fix: migration order

* feat: implement rate limit per-webhook

* feat: comment

* feat: clean up docs

* chore: gen

* fix: migration versions

* fix: olap naming

* fix: partitions

* chore: gen

* feat: store webhook cel eval failures properly

* fix: pk order

* fix: auth tweaks, move fetches out of populator

* fix: pgtype.Text instead of string pointer

* chore: gen

---------

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
2025-07-30 13:27:38 -04:00

518 lines
16 KiB
Go

package run
import (
"context"
"errors"
"fmt"
"net/http"
"time"
"github.com/getkin/kin-openapi/openapi3"
"github.com/labstack/echo/v4"
"github.com/labstack/echo/v4/middleware"
"github.com/rs/zerolog"
"github.com/hatchet-dev/hatchet/api/v1/server/authn"
"github.com/hatchet-dev/hatchet/api/v1/server/authz"
apitokens "github.com/hatchet-dev/hatchet/api/v1/server/handlers/api-tokens"
"github.com/hatchet-dev/hatchet/api/v1/server/handlers/events"
"github.com/hatchet-dev/hatchet/api/v1/server/handlers/info"
"github.com/hatchet-dev/hatchet/api/v1/server/handlers/ingestors"
"github.com/hatchet-dev/hatchet/api/v1/server/handlers/logs"
"github.com/hatchet-dev/hatchet/api/v1/server/handlers/metadata"
"github.com/hatchet-dev/hatchet/api/v1/server/handlers/monitoring"
rate_limits "github.com/hatchet-dev/hatchet/api/v1/server/handlers/rate-limits"
slackapp "github.com/hatchet-dev/hatchet/api/v1/server/handlers/slack-app"
stepruns "github.com/hatchet-dev/hatchet/api/v1/server/handlers/step-runs"
"github.com/hatchet-dev/hatchet/api/v1/server/handlers/tenants"
"github.com/hatchet-dev/hatchet/api/v1/server/handlers/users"
celv1 "github.com/hatchet-dev/hatchet/api/v1/server/handlers/v1/cel"
eventsv1 "github.com/hatchet-dev/hatchet/api/v1/server/handlers/v1/events"
filtersv1 "github.com/hatchet-dev/hatchet/api/v1/server/handlers/v1/filters"
"github.com/hatchet-dev/hatchet/api/v1/server/handlers/v1/tasks"
webhooksv1 "github.com/hatchet-dev/hatchet/api/v1/server/handlers/v1/webhooks"
workflowrunsv1 "github.com/hatchet-dev/hatchet/api/v1/server/handlers/v1/workflow-runs"
webhookworker "github.com/hatchet-dev/hatchet/api/v1/server/handlers/webhook-worker"
"github.com/hatchet-dev/hatchet/api/v1/server/handlers/workers"
workflowruns "github.com/hatchet-dev/hatchet/api/v1/server/handlers/workflow-runs"
"github.com/hatchet-dev/hatchet/api/v1/server/handlers/workflows"
"github.com/hatchet-dev/hatchet/api/v1/server/headers"
hatchetmiddleware "github.com/hatchet-dev/hatchet/api/v1/server/middleware"
"github.com/hatchet-dev/hatchet/api/v1/server/middleware/populator"
"github.com/hatchet-dev/hatchet/api/v1/server/oas/gen"
"github.com/hatchet-dev/hatchet/pkg/config/server"
"github.com/hatchet-dev/hatchet/pkg/repository/postgres/sqlchelpers"
"golang.org/x/time/rate"
)
type apiService struct {
*users.UserService
*tenants.TenantService
*events.EventService
*rate_limits.RateLimitService
*logs.LogService
*workflows.WorkflowService
*workers.WorkerService
*metadata.MetadataService
*apitokens.APITokenService
*stepruns.StepRunService
*ingestors.IngestorsService
*slackapp.SlackAppService
*webhookworker.WebhookWorkersService
*workflowruns.WorkflowRunsService
*monitoring.MonitoringService
*info.InfoService
*tasks.TasksService
*workflowrunsv1.V1WorkflowRunsService
*eventsv1.V1EventsService
*filtersv1.V1FiltersService
*webhooksv1.V1WebhooksService
*celv1.V1CELService
}
func newAPIService(config *server.ServerConfig) *apiService {
return &apiService{
UserService: users.NewUserService(config),
TenantService: tenants.NewTenantService(config),
EventService: events.NewEventService(config),
RateLimitService: rate_limits.NewRateLimitService(config),
LogService: logs.NewLogService(config),
WorkflowService: workflows.NewWorkflowService(config),
WorkflowRunsService: workflowruns.NewWorkflowRunsService(config),
WorkerService: workers.NewWorkerService(config),
MetadataService: metadata.NewMetadataService(config),
APITokenService: apitokens.NewAPITokenService(config),
StepRunService: stepruns.NewStepRunService(config),
IngestorsService: ingestors.NewIngestorsService(config),
SlackAppService: slackapp.NewSlackAppService(config),
WebhookWorkersService: webhookworker.NewWebhookWorkersService(config),
MonitoringService: monitoring.NewMonitoringService(config),
InfoService: info.NewInfoService(config),
TasksService: tasks.NewTasksService(config),
V1WorkflowRunsService: workflowrunsv1.NewV1WorkflowRunsService(config),
V1EventsService: eventsv1.NewV1EventsService(config),
V1FiltersService: filtersv1.NewV1FiltersService(config),
V1WebhooksService: webhooksv1.NewV1WebhooksService(config),
V1CELService: celv1.NewV1CELService(config),
}
}
type APIServer struct {
config *server.ServerConfig
additionalMiddlewares []hatchetmiddleware.MiddlewareFunc
}
func NewAPIServer(config *server.ServerConfig) *APIServer {
return &APIServer{
config: config,
}
}
// APIServerExtensionOpt returns a spec and a way to register handlers with an echo group
type APIServerExtensionOpt func(config *server.ServerConfig) (*openapi3.T, func(*echo.Group, *populator.Populator) error, error)
func (t *APIServer) Run(opts ...APIServerExtensionOpt) (func() error, error) {
e, err := t.getCoreEchoService()
if err != nil {
return nil, err
}
for _, opt := range opts {
// extensions are implemented as their own echo group which validate against the
// extension's spec
g := e.Group("")
spec, f, err := opt(t.config)
if err != nil {
return nil, err
}
populator, err := t.registerSpec(g, spec, t.additionalMiddlewares)
if err != nil {
return nil, err
}
if err := f(g, populator); err != nil {
return nil, err
}
}
return t.RunWithServer(e)
}
func (t *APIServer) RunWithMiddlewares(middlewares []hatchetmiddleware.MiddlewareFunc, opts ...APIServerExtensionOpt) (func() error, error) {
t.additionalMiddlewares = middlewares
return t.Run(opts...)
}
func (t *APIServer) RunWithServer(e *echo.Echo) (func() error, error) {
routes := e.Routes()
for _, route := range routes {
t.config.Logger.Debug().Msgf("registered route: %s %s", route.Method, route.Path)
}
go func() {
if err := e.Start(fmt.Sprintf(":%d", t.config.Runtime.Port)); err != nil && !errors.Is(err, http.ErrServerClosed) {
panic(err)
}
}()
cleanup := func() error {
return e.Shutdown(context.Background())
}
return cleanup, nil
}
func (t *APIServer) getCoreEchoService() (*echo.Echo, error) {
oaspec, err := gen.GetSwagger()
if err != nil {
return nil, err
}
e := echo.New()
e.HideBanner = true
e.HidePort = true
g := e.Group("")
if _, err := t.registerSpec(g, oaspec, t.additionalMiddlewares); err != nil {
return nil, err
}
service := newAPIService(t.config)
myStrictApiHandler := gen.NewStrictHandler(service)
gen.RegisterHandlers(g, myStrictApiHandler)
return e, nil
}
func (t *APIServer) registerSpec(g *echo.Group, spec *openapi3.T, middlewares []hatchetmiddleware.MiddlewareFunc) (*populator.Populator, error) {
// application middleware
populatorMW := populator.NewPopulator(t.config)
populatorMW.RegisterGetter("tenant", func(config *server.ServerConfig, parentId, id string) (result interface{}, uniqueParentId string, err error) {
ctxTimeout, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
tenant, err := config.APIRepository.Tenant().GetTenantByID(ctxTimeout, id)
if err != nil {
return nil, "", err
}
return tenant, "", nil
})
populatorMW.RegisterGetter("api-token", func(config *server.ServerConfig, parentId, id string) (result interface{}, uniqueParentId string, err error) {
ctxTimeout, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
apiToken, err := config.APIRepository.APIToken().GetAPITokenById(ctxTimeout, id)
if err != nil {
return nil, "", err
}
// at the moment, API tokens should have a tenant id, because there are no other types of
// API tokens. If we add other types of API tokens, we'll need to pass in a parent id to query
// for.
if !apiToken.TenantId.Valid {
return nil, "", fmt.Errorf("api token has no tenant id")
}
return apiToken, sqlchelpers.UUIDToStr(apiToken.TenantId), nil
})
populatorMW.RegisterGetter("tenant-invite", func(config *server.ServerConfig, parentId, id string) (result interface{}, uniqueParentId string, err error) {
timeoutCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
tenantInvite, err := config.APIRepository.TenantInvite().GetTenantInvite(timeoutCtx, id)
if err != nil {
return nil, "", err
}
return tenantInvite, sqlchelpers.UUIDToStr(tenantInvite.TenantId), nil
})
populatorMW.RegisterGetter("slack", func(config *server.ServerConfig, parentId, id string) (result interface{}, uniqueParentId string, err error) {
timeoutCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
slackWebhook, err := config.APIRepository.Slack().GetSlackWebhookById(timeoutCtx, id)
if err != nil {
return nil, "", err
}
return slackWebhook, sqlchelpers.UUIDToStr(slackWebhook.TenantId), nil
})
populatorMW.RegisterGetter("alert-email-group", func(config *server.ServerConfig, parentId, id string) (result interface{}, uniqueParentId string, err error) {
timeoutCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
emailGroup, err := config.APIRepository.TenantAlertingSettings().GetTenantAlertGroupById(timeoutCtx, id)
if err != nil {
return nil, "", err
}
return emailGroup, sqlchelpers.UUIDToStr(emailGroup.TenantId), nil
})
populatorMW.RegisterGetter("sns", func(config *server.ServerConfig, parentId, id string) (result interface{}, uniqueParentId string, err error) {
timeoutCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
snsIntegration, err := config.APIRepository.SNS().GetSNSIntegrationById(timeoutCtx, id)
if err != nil {
return nil, "", err
}
return snsIntegration, sqlchelpers.UUIDToStr(snsIntegration.TenantId), nil
})
populatorMW.RegisterGetter("workflow", func(config *server.ServerConfig, parentId, id string) (result interface{}, uniqueParentId string, err error) {
workflow, err := config.APIRepository.Workflow().GetWorkflowById(context.Background(), id)
if err != nil {
return nil, "", err
}
return workflow, sqlchelpers.UUIDToStr(workflow.Workflow.TenantId), nil
})
populatorMW.RegisterGetter("workflow-run", func(config *server.ServerConfig, parentId, id string) (result interface{}, uniqueParentId string, err error) {
workflowRun, err := config.APIRepository.WorkflowRun().GetWorkflowRunById(context.Background(), parentId, id)
if err != nil {
return nil, "", err
}
return workflowRun, sqlchelpers.UUIDToStr(workflowRun.TenantId), nil
})
populatorMW.RegisterGetter("scheduled-workflow-run", func(config *server.ServerConfig, parentId, id string) (result interface{}, uniqueParentId string, err error) {
scheduled, err := config.APIRepository.WorkflowRun().GetScheduledWorkflow(context.Background(), parentId, id)
if err != nil {
return nil, "", err
}
if scheduled == nil {
return nil, "", echo.NewHTTPError(http.StatusNotFound, "scheduled workflow run not found")
}
return scheduled, sqlchelpers.UUIDToStr(scheduled.TenantId), nil
})
populatorMW.RegisterGetter("cron-workflow", func(config *server.ServerConfig, parentId, id string) (result interface{}, uniqueParentId string, err error) {
scheduled, err := config.APIRepository.Workflow().GetCronWorkflow(context.Background(), parentId, id)
if err != nil {
return nil, "", err
}
if scheduled == nil {
return nil, "", echo.NewHTTPError(http.StatusNotFound, "cron workflow not found")
}
return scheduled, sqlchelpers.UUIDToStr(scheduled.TenantId), nil
})
populatorMW.RegisterGetter("step-run", func(config *server.ServerConfig, parentId, id string) (result interface{}, uniqueParentId string, err error) {
stepRun, err := config.APIRepository.StepRun().GetStepRunById(id)
if err != nil {
return nil, "", err
}
if parentId != "" && sqlchelpers.UUIDToStr(stepRun.TenantId) != parentId {
return nil, "", fmt.Errorf("tenant id mismatch when populating step run")
}
return stepRun, sqlchelpers.UUIDToStr(stepRun.TenantId), nil
})
populatorMW.RegisterGetter("event", func(config *server.ServerConfig, parentId, id string) (result interface{}, uniqueParentId string, err error) {
timeoutCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
event, err := config.APIRepository.Event().GetEventById(timeoutCtx, id)
if err != nil {
return nil, "", err
}
return event, sqlchelpers.UUIDToStr(event.TenantId), nil
})
populatorMW.RegisterGetter("worker", func(config *server.ServerConfig, parentId, id string) (result interface{}, uniqueParentId string, err error) {
worker, err := config.APIRepository.Worker().GetWorkerById(id)
if err != nil {
return nil, "", err
}
return worker, sqlchelpers.UUIDToStr(worker.Worker.TenantId), nil
})
populatorMW.RegisterGetter("webhook", func(config *server.ServerConfig, parentId, id string) (result interface{}, uniqueParentId string, err error) {
timeoutCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
webhookWorker, err := config.APIRepository.WebhookWorker().GetWebhookWorkerByID(timeoutCtx, id)
if err != nil {
return nil, "", err
}
return webhookWorker, sqlchelpers.UUIDToStr(webhookWorker.TenantId), nil
})
populatorMW.RegisterGetter("task", func(config *server.ServerConfig, parentId, id string) (result interface{}, uniqueParentId string, err error) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
task, err := config.V1.OLAP().ReadTaskRun(ctx, id)
if err != nil {
return nil, "", err
}
if task == nil {
return nil, "", echo.NewHTTPError(http.StatusNotFound, "task not found")
}
return task, sqlchelpers.UUIDToStr(task.TenantID), nil
})
populatorMW.RegisterGetter("v1-workflow-run", func(config *server.ServerConfig, parentId, id string) (result interface{}, uniqueParentId string, err error) {
workflowRun, err := t.config.V1.OLAP().ReadWorkflowRun(context.Background(), sqlchelpers.UUIDFromStr(id))
if err != nil {
return nil, "", err
}
return workflowRun, sqlchelpers.UUIDToStr(workflowRun.WorkflowRun.TenantID), nil
})
populatorMW.RegisterGetter("v1-filter", func(config *server.ServerConfig, parentId, id string) (result interface{}, uniqueParentId string, err error) {
filter, err := t.config.V1.Filters().GetFilter(
context.Background(),
parentId,
id,
)
if err != nil {
return nil, "", err
}
return filter, sqlchelpers.UUIDToStr(filter.TenantID), nil
})
populatorMW.RegisterGetter("v1-webhook", func(config *server.ServerConfig, parentId, id string) (result interface{}, uniqueParentId string, err error) {
webhook, err := t.config.V1.Webhooks().GetWebhook(
context.Background(),
parentId,
id,
)
if err != nil {
return nil, "", err
}
return webhook, sqlchelpers.UUIDToStr(webhook.TenantID), nil
})
authnMW := authn.NewAuthN(t.config)
authzMW := authz.NewAuthZ(t.config)
mw, err := hatchetmiddleware.NewMiddlewareHandler(spec)
if err != nil {
return nil, err
}
mw.Use(headers.Middleware())
mw.Use(populatorMW.Middleware)
mw.Use(authnMW.Middleware)
mw.Use(authzMW.Middleware)
for _, m := range t.additionalMiddlewares {
mw.Use(m)
}
allHatchetMiddleware, err := mw.Middleware()
if err != nil {
return nil, err
}
loggerMiddleware := middleware.RequestLoggerWithConfig(middleware.RequestLoggerConfig{
LogURI: true,
LogStatus: true,
LogError: true,
LogLatency: true,
LogRemoteIP: true,
LogHost: true,
LogMethod: true,
LogURIPath: true,
LogUserAgent: true,
LogValuesFunc: func(c echo.Context, v middleware.RequestLoggerValues) error {
statusCode := v.Status
// note that the status code is not set yet as it gets picked up by the global err handler
// see here: https://github.com/labstack/echo/issues/2310#issuecomment-1288196898
if v.Error != nil && statusCode == 200 {
statusCode = 500
}
var e *zerolog.Event
switch {
case statusCode >= 500:
e = t.config.Logger.Error().Err(v.Error)
case statusCode >= 400:
e = t.config.Logger.Warn()
default:
e = t.config.Logger.Info()
}
e.
Dur("latency", v.Latency).
Int("status", statusCode).
Str("method", v.Method).
Str("uri", v.URI).
Str("user_agent", v.UserAgent).
Str("remote_ip", v.RemoteIP).
Str("host", v.Host).
Msg("API")
return nil
},
})
// register echo middleware
g.Use(
loggerMiddleware,
middleware.Recover(),
allHatchetMiddleware,
hatchetmiddleware.WebhookRateLimitMiddleware(
rate.Limit(t.config.Runtime.WebhookRateLimit),
t.config.Runtime.WebhookRateLimitBurst,
t.config.Logger,
),
)
return populatorMW, nil
}