diff --git a/Taskfile.yaml b/Taskfile.yaml index bd72892f9..c6576d262 100644 --- a/Taskfile.yaml +++ b/Taskfile.yaml @@ -66,7 +66,6 @@ tasks: - task: generate-sqlc seed-dev: cmds: - - sh ./hack/dev/run-go-with-env.sh run github.com/steebchen/prisma-client-go migrate dev --skip-generate - SEED_DEVELOPMENT=true sh ./hack/dev/run-go-with-env.sh run ./cmd/hatchet-admin seed start-dev: deps: diff --git a/api/v1/server/handlers/tenants/create.go b/api/v1/server/handlers/tenants/create.go index 35df2c95a..c4a8cb676 100644 --- a/api/v1/server/handlers/tenants/create.go +++ b/api/v1/server/handlers/tenants/create.go @@ -49,6 +49,10 @@ func (t *TenantService) TenantCreate(ctx echo.Context, request gen.TenantCreateR Name: request.Body.Name, } + if t.config.Runtime.Limits.DefaultTenantRetentionPeriod != "" { + createOpts.DataRetentionPeriod = &t.config.Runtime.Limits.DefaultTenantRetentionPeriod + } + // write the user to the db tenant, err := t.config.APIRepository.Tenant().CreateTenant(createOpts) diff --git a/frontend/docs/pages/self-hosting/_meta.json b/frontend/docs/pages/self-hosting/_meta.json index 7770617f3..25fa95dc3 100644 --- a/frontend/docs/pages/self-hosting/_meta.json +++ b/frontend/docs/pages/self-hosting/_meta.json @@ -1,21 +1,22 @@ { - "index": "Introduction", - "-- Docker": { - "type": "separator", - "title": "Docker" - }, - "hatchet-lite": "Hatchet Lite", - "docker-compose": "Docker Compose", - "-- Kubernetes": { - "type": "separator", - "title": "Kubernetes" - }, - "kubernetes-quickstart": "Quickstart", - "kubernetes-glasskube": "Installing with Glasskube", - "networking": "Networking", - "-- Managing Hatchet": { - "type": "separator", - "title": "Managing Hatchet" - }, - "configuration-options": "Configuration Options" + "index": "Introduction", + "-- Docker": { + "type": "separator", + "title": "Docker" + }, + "hatchet-lite": "Hatchet Lite", + "docker-compose": "Docker Compose", + "-- Kubernetes": { + "type": "separator", + "title": "Kubernetes" + }, + "kubernetes-quickstart": "Quickstart", + "kubernetes-glasskube": "Installing with Glasskube", + "networking": "Networking", + "-- Managing Hatchet": { + "type": "separator", + "title": "Managing Hatchet" + }, + "configuration-options": "Configuration Options", + "data-retention": "Data Retention" } diff --git a/frontend/docs/pages/self-hosting/configuration-options.mdx b/frontend/docs/pages/self-hosting/configuration-options.mdx index bf27411d9..65058b778 100644 --- a/frontend/docs/pages/self-hosting/configuration-options.mdx +++ b/frontend/docs/pages/self-hosting/configuration-options.mdx @@ -1,8 +1,6 @@ # Configuration Options -The Hatchet server and engine can be configured via `HATCHET_SERVER` environment variables. This document contains a list of all available options. - -This document outlines the environment variables used to configure the server. These variables are grouped based on the configuration sections they belong to. +The Hatchet server and engine can be configured via `SERVER` and `DATABASE` environment variables. This document contains a list of all available options. ## Runtime Configuration @@ -14,13 +12,59 @@ This document outlines the environment variables used to configure the server. T | `SERVER_GRPC_BIND_ADDRESS` | GRPC server bind address | `127.0.0.1` | | `SERVER_GRPC_BROADCAST_ADDRESS` | GRPC server broadcast address | `127.0.0.1:7070` | | `SERVER_GRPC_INSECURE` | Controls if the GRPC server is insecure | `false` | -| `SERVER_WORKER_ENABLED` | Whether the internal worker is enabled | `false` | +| `SERVER_SHUTDOWN_WAIT` | Shutdown wait duration | `20s` | +| `SERVER_ENFORCE_LIMITS` | Enforce tenant limits | `false` | +| `SERVER_ALLOW_SIGNUP` | Allow new tenant signups | `true` | +| `SERVER_ALLOW_INVITES` | Allow new invites | `true` | +| `SERVER_ALLOW_CREATE_TENANT` | Allow tenant creation | `true` | +| `SERVER_ALLOW_CHANGE_PASSWORD` | Allow password changes | `true` | -## Services Configuration +## Database Configuration -| Variable | Description | Default Value | -| ----------------- | ------------------------ | ------------------------------------------------------------------------------------------------ | -| `SERVER_SERVICES` | List of enabled services | `["health", "ticker", "grpc", "eventscontroller", "queue", "webhookscontroller", "heartbeater"]` | +| Variable | Description | Default Value | +| ---------------------------- | ------------------------ | ------------- | +| `DATABASE_POSTGRES_HOST` | PostgreSQL host | `127.0.0.1` | +| `DATABASE_POSTGRES_PORT` | PostgreSQL port | `5431` | +| `DATABASE_POSTGRES_USERNAME` | PostgreSQL username | `hatchet` | +| `DATABASE_POSTGRES_PASSWORD` | PostgreSQL password | `hatchet` | +| `DATABASE_POSTGRES_DB_NAME` | PostgreSQL database name | `hatchet` | +| `DATABASE_POSTGRES_SSL_MODE` | PostgreSQL SSL mode | `disable` | +| `DATABASE_MAX_CONNS` | Max database connections | `5` | +| `DATABASE_LOG_QUERIES` | Log database queries | `false` | +| `CACHE_DURATION` | Cache duration | `60s` | + +## Security Check Configuration + +| Variable | Description | Default Value | +| -------------------------------- | ----------------------- | ------------------------------ | +| `SERVER_SECURITY_CHECK_ENABLED` | Enable security check | `true` | +| `SERVER_SECURITY_CHECK_ENDPOINT` | Security check endpoint | `https://security.hatchet.run` | + +## Limit Configuration + +| Variable | Description | Default Value | +| ------------------------------------------------ | -------------------------------- | ------------- | +| `SERVER_LIMITS_DEFAULT_TENANT_RETENTION_PERIOD` | Default tenant retention period | `720h` | +| `SERVER_LIMITS_DEFAULT_WORKFLOW_RUN_LIMIT` | Default workflow run limit | `1000` | +| `SERVER_LIMITS_DEFAULT_WORKFLOW_RUN_ALARM_LIMIT` | Default workflow run alarm limit | `750` | +| `SERVER_LIMITS_DEFAULT_WORKFLOW_RUN_WINDOW` | Default workflow run window | `24h` | +| `SERVER_LIMITS_DEFAULT_WORKER_LIMIT` | Default worker limit | `4` | +| `SERVER_LIMITS_DEFAULT_WORKER_ALARM_LIMIT` | Default worker alarm limit | `2` | +| `SERVER_LIMITS_DEFAULT_EVENT_LIMIT` | Default event limit | `1000` | +| `SERVER_LIMITS_DEFAULT_EVENT_ALARM_LIMIT` | Default event alarm limit | `750` | +| `SERVER_LIMITS_DEFAULT_EVENT_WINDOW` | Default event window | `24h` | +| `SERVER_LIMITS_DEFAULT_CRON_LIMIT` | Default cron limit | `5` | +| `SERVER_LIMITS_DEFAULT_CRON_ALARM_LIMIT` | Default cron alarm limit | `2` | +| `SERVER_LIMITS_DEFAULT_SCHEDULE_LIMIT` | Default schedule limit | `1000` | +| `SERVER_LIMITS_DEFAULT_SCHEDULE_ALARM_LIMIT` | Default schedule alarm limit | `750` | + +## Alerting Configuration + +| Variable | Description | Default Value | +| ------------------------------------ | -------------------------- | ------------- | +| `SERVER_ALERTING_SENTRY_ENABLED` | Enable Sentry for alerting | | +| `SERVER_ALERTING_SENTRY_DSN` | Sentry DSN | | +| `SERVER_ALERTING_SENTRY_ENVIRONMENT` | Sentry environment | `development` | ## Encryption Configuration @@ -51,12 +95,17 @@ This document outlines the environment variables used to configure the server. T | `SERVER_AUTH_GOOGLE_CLIENT_ID` | Google auth client ID | | | `SERVER_AUTH_GOOGLE_CLIENT_SECRET` | Google auth client secret | | | `SERVER_AUTH_GOOGLE_SCOPES` | Google auth scopes | `["openid", "profile", "email"]` | +| `SERVER_AUTH_GITHUB_ENABLED` | Whether GitHub auth is enabled | `false` | +| `SERVER_AUTH_GITHUB_CLIENT_ID` | GitHub auth client ID | | +| `SERVER_AUTH_GITHUB_CLIENT_SECRET` | GitHub auth client secret | | +| `SERVER_AUTH_GITHUB_SCOPES` | GitHub auth scopes | `["read:user", "user:email"]` | ## Task Queue Configuration -| Variable | Description | Default Value | -| ------------------------------- | ------------ | -------------------------------------- | -| `SERVER_TASKQUEUE_RABBITMQ_URL` | RabbitMQ URL | `amqp://user:password@localhost:5672/` | +| Variable | Description | Default Value | +| ------------------------------ | ------------------ | -------------------------------------- | +| `SERVER_MSGQUEUE_KIND` | Message queue kind | | +| `SERVER_MSGQUEUE_RABBITMQ_URL` | RabbitMQ URL | `amqp://user:password@localhost:5672/` | ## TLS Configuration @@ -73,10 +122,12 @@ This document outlines the environment variables used to configure the server. T ## Logging Configuration -| Variable | Description | Default Value | -| ---------------------- | ------------- | ------------- | -| `SERVER_LOGGER_LEVEL` | Logger level | | -| `SERVER_LOGGER_FORMAT` | Logger format | | +| Variable | Description | Default Value | +| ------------------------ | ------------- | ------------- | +| `SERVER_LOGGER_LEVEL` | Logger level | | +| `SERVER_LOGGER_FORMAT` | Logger format | | +| `DATABASE_LOGGER_LEVEL` | Logger level | | +| `DATABASE_LOGGER_FORMAT` | Logger format | | ## OpenTelemetry Configuration @@ -85,16 +136,11 @@ This document outlines the environment variables used to configure the server. T | `SERVER_OTEL_SERVICE_NAME` | Service name for OpenTelemetry | | | `SERVER_OTEL_COLLECTOR_URL` | Collector URL for OpenTelemetry | | -## Version Control System (VCS) Configuration +## Tenant Alerting Configuration -| Variable | Description | Default Value | -| -------------------------------------- | ----------------------------- | ------------- | -| `SERVER_VCS_KIND` | Type of VCS | | -| `SERVER_VCS_GITHUB_ENABLED` | Whether GitHub is enabled | | -| `SERVER_VCS_GITHUB_APP_CLIENT_ID` | GitHub app client ID | | -| `SERVER_VCS_GITHUB_APP_CLIENT_SECRET` | GitHub app client secret | | -| `SERVER_VCS_GITHUB_APP_NAME` | GitHub app name | | -| `SERVER_VCS_GITHUB_APP_WEBHOOK_SECRET` | GitHub app webhook secret | | -| `SERVER_VCS_GITHUB_APP_WEBHOOK_URL` | GitHub app webhook URL | | -| `SERVER_VCS_GITHUB_APP_ID` | GitHub app ID | | -| `SERVER_VCS_GITHUB_APP_SECRET_PATH` | Path to the GitHub app secret | | +| Variable | Description | Default Value | +| -------------------------------------------- | -------------------------------- | ---------------------- | +| `SERVER_TENANT_ALERTING_SLACK_ENABLED` | Enable Slack for tenant alerting | | +| `SERVER_TENANT_ALERTING_SLACK_CLIENT_ID` | Slack client ID | | +| `SERVER_TENANT_ALERTING_SLACK_CLIENT_SECRET` | Slack client secret | | +| `SERVER_TENANT_ALERTING_SLACK_SCOPES` | Slack scopes | `["incoming-webhook"]` | diff --git a/frontend/docs/pages/self-hosting/data-retention.mdx b/frontend/docs/pages/self-hosting/data-retention.mdx new file mode 100644 index 000000000..0a82abb01 --- /dev/null +++ b/frontend/docs/pages/self-hosting/data-retention.mdx @@ -0,0 +1,9 @@ +# Data Retention + +In Hatchet engine version `0.36.0` and above, you can configure the default data retention per tenant for workflow runs and events. The default value is set to 30 days, which means that all workflow runs which were created over 30 days ago and are in a final state (i.e. completed or failed), and all events which were created over 30 days ago, will be deleted. + +This can be configured by setting the following environment variable to a Go duration string: + +```sh +SERVER_LIMITS_DEFAULT_TENANT_RETENTION_PERIOD=720h # 30 days +``` diff --git a/internal/services/controllers/jobs/controller.go b/internal/services/controllers/jobs/controller.go index 50fa150bd..101ffb34c 100644 --- a/internal/services/controllers/jobs/controller.go +++ b/internal/services/controllers/jobs/controller.go @@ -685,7 +685,7 @@ func (jc *JobsControllerImpl) runStepRunReassign(ctx context.Context, startedAt err = g.Wait() if err != nil { - jc.l.Err(err).Msg("could not run step run requeue") + jc.l.Err(err).Msg("could not run step run reassign") } } } diff --git a/internal/services/controllers/workflows/controller.go b/internal/services/controllers/workflows/controller.go index dfdb88d4b..02da09ed7 100644 --- a/internal/services/controllers/workflows/controller.go +++ b/internal/services/controllers/workflows/controller.go @@ -182,6 +182,30 @@ func (wc *WorkflowsControllerImpl) Start() (func() error, error) { return nil, fmt.Errorf("could not schedule get group key run reassign: %w", err) } + _, err = wc.s.NewJob( + gocron.DurationJob(time.Second*60), + gocron.NewTask( + wc.runDeleteExpiredWorkflowRuns(ctx), + ), + ) + + if err != nil { + cancel() + return nil, fmt.Errorf("could not delete expired workflow runs: %w", err) + } + + _, err = wc.s.NewJob( + gocron.DurationJob(time.Second*60), + gocron.NewTask( + wc.runDeleteExpiredEvents(ctx), + ), + ) + + if err != nil { + cancel() + return nil, fmt.Errorf("could not delete expired events: %w", err) + } + wc.s.Start() f := func(task *msgqueue.Message) error { diff --git a/internal/services/controllers/workflows/queue.go b/internal/services/controllers/workflows/queue.go index 5ffe03ad2..109ebf1d8 100644 --- a/internal/services/controllers/workflows/queue.go +++ b/internal/services/controllers/workflows/queue.go @@ -493,6 +493,139 @@ func (ec *WorkflowsControllerImpl) runGetGroupKeyRunReassignTenant(ctx context.C return g.Wait() } +func (wc *WorkflowsControllerImpl) runDeleteExpiredWorkflowRuns(ctx context.Context) func() { + return func() { + ctx, cancel := context.WithTimeout(ctx, 60*time.Second) + defer cancel() + + wc.l.Debug().Msgf("workflows controller: deleting expired workflow runs") + + // list all tenants + tenants, err := wc.repo.Tenant().ListTenantsByControllerPartition(ctx, wc.partitionId) + + if err != nil { + wc.l.Err(err).Msg("could not list tenants") + return + } + + g := new(errgroup.Group) + + for i := range tenants { + index := i + g.Go(func() error { + return wc.runDeleteExpiredWorkflowRunsTenant(ctx, *tenants[index]) + }) + } + + err = g.Wait() + + if err != nil { + wc.l.Err(err).Msg("could not run delete expired workflow runs") + } + } +} + +func (wc *WorkflowsControllerImpl) runDeleteExpiredWorkflowRunsTenant(ctx context.Context, tenant dbsqlc.Tenant) error { + ctx, span := telemetry.NewSpan(ctx, "delete-expired-workflow-runs") + defer span.End() + + tenantId := sqlchelpers.UUIDToStr(tenant.ID) + + createdBefore, err := getDataRetentionExpiredTime(tenant.DataRetentionPeriod) + + if err != nil { + return fmt.Errorf("could not get data retention expired time: %w", err) + } + + // keep deleting until the context is done + for { + select { + case <-ctx.Done(): + return nil + default: + } + + // delete expired workflow runs + _, remaining, err := wc.repo.WorkflowRun().DeleteExpiredWorkflowRuns(ctx, tenantId, []dbsqlc.WorkflowRunStatus{ + dbsqlc.WorkflowRunStatusSUCCEEDED, + dbsqlc.WorkflowRunStatusFAILED, + }, createdBefore) + + if err != nil { + return fmt.Errorf("could not delete expired workflow runs: %w", err) + } + + if remaining == 0 { + return nil + } + } +} + +func (wc *WorkflowsControllerImpl) runDeleteExpiredEvents(ctx context.Context) func() { + return func() { + ctx, cancel := context.WithTimeout(ctx, 60*time.Second) + defer cancel() + + wc.l.Debug().Msgf("workflows controller: deleting expired events") + + // list all tenants + tenants, err := wc.repo.Tenant().ListTenantsByControllerPartition(ctx, wc.partitionId) + + if err != nil { + wc.l.Err(err).Msg("could not list tenants") + return + } + + g := new(errgroup.Group) + + for i := range tenants { + index := i + g.Go(func() error { + return wc.runDeleteExpiredEventsTenant(ctx, *tenants[index]) + }) + } + + err = g.Wait() + + if err != nil { + wc.l.Err(err).Msg("could not run delete expired events") + } + } +} + +func (wc *WorkflowsControllerImpl) runDeleteExpiredEventsTenant(ctx context.Context, tenant dbsqlc.Tenant) error { + ctx, span := telemetry.NewSpan(ctx, "delete-expired-events") + defer span.End() + + tenantId := sqlchelpers.UUIDToStr(tenant.ID) + + createdBefore, err := getDataRetentionExpiredTime(tenant.DataRetentionPeriod) + + if err != nil { + return fmt.Errorf("could not get data retention expired time: %w", err) + } + + // keep deleting until the context is done + for { + select { + case <-ctx.Done(): + return nil + default: + } + + // delete expired workflow runs + _, remaining, err := wc.repo.Event().DeleteExpiredEvents(ctx, tenantId, createdBefore) + + if err != nil { + return fmt.Errorf("could not delete expired events: %w", err) + } + + if remaining == 0 { + return nil + } + } +} + func (wc *WorkflowsControllerImpl) queueByCancelInProgress(ctx context.Context, tenantId, groupKey string, workflowVersion *dbsqlc.GetWorkflowVersionForEngineRow) error { ctx, span := telemetry.NewSpan(ctx, "queue-by-cancel-in-progress") defer span.End() @@ -697,3 +830,13 @@ func getStepRunNotifyCancelTask(tenantId, stepRunId, reason string) *msgqueue.Me Retries: 3, } } + +func getDataRetentionExpiredTime(duration string) (time.Time, error) { + d, err := time.ParseDuration(duration) + + if err != nil { + return time.Time{}, fmt.Errorf("could not parse duration: %w", err) + } + + return time.Now().UTC().Add(-d), nil +} diff --git a/pkg/config/server/server.go b/pkg/config/server/server.go index 3bd99fd80..97a3253e9 100644 --- a/pkg/config/server/server.go +++ b/pkg/config/server/server.go @@ -100,6 +100,8 @@ type SecurityCheckConfigFile struct { } type LimitConfigFile struct { + DefaultTenantRetentionPeriod string `mapstructure:"defaultTenantRetentionPeriod" json:"defaultTenantRetentionPeriod,omitempty" default:"720h"` + DefaultWorkflowRunLimit int `mapstructure:"defaultWorkflowRunLimit" json:"defaultWorkflowRunLimit,omitempty" default:"1000"` DefaultWorkflowRunAlarmLimit int `mapstructure:"defaultWorkflowRunAlarmLimit" json:"defaultWorkflowRunAlarmLimit,omitempty" default:"750"` DefaultWorkflowRunWindow time.Duration `mapstructure:"defaultWorkflowRunWindow" json:"defaultWorkflowRunWindow,omitempty" default:"24h"` @@ -371,6 +373,8 @@ func BindAllEnv(v *viper.Viper) { _ = v.BindEnv("securityCheck.endpoint", "SERVER_SECURITY_CHECK_ENDPOINT") // limit options + _ = v.BindEnv("runtime.limits.defaultTenantRetentionPeriod", "SERVER_LIMITS_DEFAULT_TENANT_RETENTION_PERIOD") + _ = v.BindEnv("runtime.limits.defaultWorkflowRunLimit", "SERVER_LIMITS_DEFAULT_WORKFLOW_RUN_LIMIT") _ = v.BindEnv("runtime.limits.defaultWorkflowRunAlarmLimit", "SERVER_LIMITS_DEFAULT_WORKFLOW_RUN_ALARM_LIMIT") _ = v.BindEnv("runtime.limits.defaultWorkflowRunWindow", "SERVER_LIMITS_DEFAULT_WORKFLOW_RUN_WINDOW") diff --git a/pkg/repository/event.go b/pkg/repository/event.go index 0772ad584..8f3f6b40a 100644 --- a/pkg/repository/event.go +++ b/pkg/repository/event.go @@ -2,6 +2,7 @@ package repository import ( "context" + "time" "github.com/hatchet-dev/hatchet/pkg/repository/prisma/db" "github.com/hatchet-dev/hatchet/pkg/repository/prisma/dbsqlc" @@ -85,4 +86,8 @@ type EventEngineRepository interface { GetEventForEngine(ctx context.Context, tenantId, id string) (*dbsqlc.Event, error) ListEventsByIds(ctx context.Context, tenantId string, ids []string) ([]*dbsqlc.Event, error) + + // DeleteExpiredEvents deletes events that were created before the given time. It returns the number of deleted events + // and the number of non-deleted events that match the conditions. + DeleteExpiredEvents(ctx context.Context, tenantId string, before time.Time) (int, int, error) } diff --git a/pkg/repository/prisma/db/db_gen.go b/pkg/repository/prisma/db/db_gen.go index 19e30ff9e..f0d5cf53e 100644 --- a/pkg/repository/prisma/db/db_gen.go +++ b/pkg/repository/prisma/db/db_gen.go @@ -245,6 +245,9 @@ model Tenant { workerPartition TenantWorkerPartition? @relation(fields: [workerPartitionId], references: [id], onDelete: SetNull, onUpdate: SetNull) workerPartitionId String? + // The data retention period for deletable resources. This is a Go duration string. + dataRetentionPeriod String @default("720h") + events Event[] workflows Workflow[] jobs Job[] @@ -2082,6 +2085,7 @@ const ( TenantScalarFieldEnumAnalyticsOptOut TenantScalarFieldEnum = "analyticsOptOut" TenantScalarFieldEnumControllerPartitionID TenantScalarFieldEnum = "controllerPartitionId" TenantScalarFieldEnumWorkerPartitionID TenantScalarFieldEnum = "workerPartitionId" + TenantScalarFieldEnumDataRetentionPeriod TenantScalarFieldEnum = "dataRetentionPeriod" TenantScalarFieldEnumAlertMemberEmails TenantScalarFieldEnum = "alertMemberEmails" ) @@ -2865,6 +2869,8 @@ const tenantFieldWorkerPartition tenantPrismaFields = "workerPartition" const tenantFieldWorkerPartitionID tenantPrismaFields = "workerPartitionId" +const tenantFieldDataRetentionPeriod tenantPrismaFields = "dataRetentionPeriod" + const tenantFieldEvents tenantPrismaFields = "events" const tenantFieldWorkflows tenantPrismaFields = "workflows" @@ -6829,6 +6835,7 @@ type InnerTenant struct { AnalyticsOptOut bool `json:"analyticsOptOut"` ControllerPartitionID *string `json:"controllerPartitionId,omitempty"` WorkerPartitionID *string `json:"workerPartitionId,omitempty"` + DataRetentionPeriod string `json:"dataRetentionPeriod"` AlertMemberEmails bool `json:"alertMemberEmails"` } @@ -6843,6 +6850,7 @@ type RawTenantModel struct { AnalyticsOptOut RawBoolean `json:"analyticsOptOut"` ControllerPartitionID *RawString `json:"controllerPartitionId,omitempty"` WorkerPartitionID *RawString `json:"workerPartitionId,omitempty"` + DataRetentionPeriod RawString `json:"dataRetentionPeriod"` AlertMemberEmails RawBoolean `json:"alertMemberEmails"` } @@ -27492,6 +27500,11 @@ type tenantQuery struct { // @optional WorkerPartitionID tenantQueryWorkerPartitionIDString + // DataRetentionPeriod + // + // @required + DataRetentionPeriod tenantQueryDataRetentionPeriodString + Events tenantQueryEventsRelations Workflows tenantQueryWorkflowsRelations @@ -30656,6 +30669,353 @@ func (r tenantQueryWorkerPartitionIDString) Field() tenantPrismaFields { return tenantFieldWorkerPartitionID } +// base struct +type tenantQueryDataRetentionPeriodString struct{} + +// Set the required value of DataRetentionPeriod +func (r tenantQueryDataRetentionPeriodString) Set(value string) tenantSetParam { + + return tenantSetParam{ + data: builder.Field{ + Name: "dataRetentionPeriod", + Value: value, + }, + } + +} + +// Set the optional value of DataRetentionPeriod dynamically +func (r tenantQueryDataRetentionPeriodString) SetIfPresent(value *String) tenantSetParam { + if value == nil { + return tenantSetParam{} + } + + return r.Set(*value) +} + +func (r tenantQueryDataRetentionPeriodString) Equals(value string) tenantWithPrismaDataRetentionPeriodEqualsParam { + + return tenantWithPrismaDataRetentionPeriodEqualsParam{ + data: builder.Field{ + Name: "dataRetentionPeriod", + Fields: []builder.Field{ + { + Name: "equals", + Value: value, + }, + }, + }, + } +} + +func (r tenantQueryDataRetentionPeriodString) EqualsIfPresent(value *string) tenantWithPrismaDataRetentionPeriodEqualsParam { + if value == nil { + return tenantWithPrismaDataRetentionPeriodEqualsParam{} + } + return r.Equals(*value) +} + +func (r tenantQueryDataRetentionPeriodString) Order(direction SortOrder) tenantDefaultParam { + return tenantDefaultParam{ + data: builder.Field{ + Name: "dataRetentionPeriod", + Value: direction, + }, + } +} + +func (r tenantQueryDataRetentionPeriodString) Cursor(cursor string) tenantCursorParam { + return tenantCursorParam{ + data: builder.Field{ + Name: "dataRetentionPeriod", + Value: cursor, + }, + } +} + +func (r tenantQueryDataRetentionPeriodString) In(value []string) tenantDefaultParam { + return tenantDefaultParam{ + data: builder.Field{ + Name: "dataRetentionPeriod", + Fields: []builder.Field{ + { + Name: "in", + Value: value, + }, + }, + }, + } +} + +func (r tenantQueryDataRetentionPeriodString) InIfPresent(value []string) tenantDefaultParam { + if value == nil { + return tenantDefaultParam{} + } + return r.In(value) +} + +func (r tenantQueryDataRetentionPeriodString) NotIn(value []string) tenantDefaultParam { + return tenantDefaultParam{ + data: builder.Field{ + Name: "dataRetentionPeriod", + Fields: []builder.Field{ + { + Name: "notIn", + Value: value, + }, + }, + }, + } +} + +func (r tenantQueryDataRetentionPeriodString) NotInIfPresent(value []string) tenantDefaultParam { + if value == nil { + return tenantDefaultParam{} + } + return r.NotIn(value) +} + +func (r tenantQueryDataRetentionPeriodString) Lt(value string) tenantDefaultParam { + return tenantDefaultParam{ + data: builder.Field{ + Name: "dataRetentionPeriod", + Fields: []builder.Field{ + { + Name: "lt", + Value: value, + }, + }, + }, + } +} + +func (r tenantQueryDataRetentionPeriodString) LtIfPresent(value *string) tenantDefaultParam { + if value == nil { + return tenantDefaultParam{} + } + return r.Lt(*value) +} + +func (r tenantQueryDataRetentionPeriodString) Lte(value string) tenantDefaultParam { + return tenantDefaultParam{ + data: builder.Field{ + Name: "dataRetentionPeriod", + Fields: []builder.Field{ + { + Name: "lte", + Value: value, + }, + }, + }, + } +} + +func (r tenantQueryDataRetentionPeriodString) LteIfPresent(value *string) tenantDefaultParam { + if value == nil { + return tenantDefaultParam{} + } + return r.Lte(*value) +} + +func (r tenantQueryDataRetentionPeriodString) Gt(value string) tenantDefaultParam { + return tenantDefaultParam{ + data: builder.Field{ + Name: "dataRetentionPeriod", + Fields: []builder.Field{ + { + Name: "gt", + Value: value, + }, + }, + }, + } +} + +func (r tenantQueryDataRetentionPeriodString) GtIfPresent(value *string) tenantDefaultParam { + if value == nil { + return tenantDefaultParam{} + } + return r.Gt(*value) +} + +func (r tenantQueryDataRetentionPeriodString) Gte(value string) tenantDefaultParam { + return tenantDefaultParam{ + data: builder.Field{ + Name: "dataRetentionPeriod", + Fields: []builder.Field{ + { + Name: "gte", + Value: value, + }, + }, + }, + } +} + +func (r tenantQueryDataRetentionPeriodString) GteIfPresent(value *string) tenantDefaultParam { + if value == nil { + return tenantDefaultParam{} + } + return r.Gte(*value) +} + +func (r tenantQueryDataRetentionPeriodString) Contains(value string) tenantDefaultParam { + return tenantDefaultParam{ + data: builder.Field{ + Name: "dataRetentionPeriod", + Fields: []builder.Field{ + { + Name: "contains", + Value: value, + }, + }, + }, + } +} + +func (r tenantQueryDataRetentionPeriodString) ContainsIfPresent(value *string) tenantDefaultParam { + if value == nil { + return tenantDefaultParam{} + } + return r.Contains(*value) +} + +func (r tenantQueryDataRetentionPeriodString) StartsWith(value string) tenantDefaultParam { + return tenantDefaultParam{ + data: builder.Field{ + Name: "dataRetentionPeriod", + Fields: []builder.Field{ + { + Name: "startsWith", + Value: value, + }, + }, + }, + } +} + +func (r tenantQueryDataRetentionPeriodString) StartsWithIfPresent(value *string) tenantDefaultParam { + if value == nil { + return tenantDefaultParam{} + } + return r.StartsWith(*value) +} + +func (r tenantQueryDataRetentionPeriodString) EndsWith(value string) tenantDefaultParam { + return tenantDefaultParam{ + data: builder.Field{ + Name: "dataRetentionPeriod", + Fields: []builder.Field{ + { + Name: "endsWith", + Value: value, + }, + }, + }, + } +} + +func (r tenantQueryDataRetentionPeriodString) EndsWithIfPresent(value *string) tenantDefaultParam { + if value == nil { + return tenantDefaultParam{} + } + return r.EndsWith(*value) +} + +func (r tenantQueryDataRetentionPeriodString) Mode(value QueryMode) tenantDefaultParam { + return tenantDefaultParam{ + data: builder.Field{ + Name: "dataRetentionPeriod", + Fields: []builder.Field{ + { + Name: "mode", + Value: value, + }, + }, + }, + } +} + +func (r tenantQueryDataRetentionPeriodString) ModeIfPresent(value *QueryMode) tenantDefaultParam { + if value == nil { + return tenantDefaultParam{} + } + return r.Mode(*value) +} + +func (r tenantQueryDataRetentionPeriodString) Not(value string) tenantDefaultParam { + return tenantDefaultParam{ + data: builder.Field{ + Name: "dataRetentionPeriod", + Fields: []builder.Field{ + { + Name: "not", + Value: value, + }, + }, + }, + } +} + +func (r tenantQueryDataRetentionPeriodString) NotIfPresent(value *string) tenantDefaultParam { + if value == nil { + return tenantDefaultParam{} + } + return r.Not(*value) +} + +// deprecated: Use StartsWith instead. + +func (r tenantQueryDataRetentionPeriodString) HasPrefix(value string) tenantDefaultParam { + return tenantDefaultParam{ + data: builder.Field{ + Name: "dataRetentionPeriod", + Fields: []builder.Field{ + { + Name: "starts_with", + Value: value, + }, + }, + }, + } +} + +// deprecated: Use StartsWithIfPresent instead. +func (r tenantQueryDataRetentionPeriodString) HasPrefixIfPresent(value *string) tenantDefaultParam { + if value == nil { + return tenantDefaultParam{} + } + return r.HasPrefix(*value) +} + +// deprecated: Use EndsWith instead. + +func (r tenantQueryDataRetentionPeriodString) HasSuffix(value string) tenantDefaultParam { + return tenantDefaultParam{ + data: builder.Field{ + Name: "dataRetentionPeriod", + Fields: []builder.Field{ + { + Name: "ends_with", + Value: value, + }, + }, + }, + } +} + +// deprecated: Use EndsWithIfPresent instead. +func (r tenantQueryDataRetentionPeriodString) HasSuffixIfPresent(value *string) tenantDefaultParam { + if value == nil { + return tenantDefaultParam{} + } + return r.HasSuffix(*value) +} + +func (r tenantQueryDataRetentionPeriodString) Field() tenantPrismaFields { + return tenantFieldDataRetentionPeriod +} + // base struct type tenantQueryEventsEvent struct{} @@ -178450,6 +178810,7 @@ var tenantOutput = []builder.Output{ {Name: "analyticsOptOut"}, {Name: "controllerPartitionId"}, {Name: "workerPartitionId"}, + {Name: "dataRetentionPeriod"}, {Name: "alertMemberEmails"}, } @@ -179475,6 +179836,84 @@ func (p tenantWithPrismaWorkerPartitionIDEqualsUniqueParam) workerPartitionIDFie func (tenantWithPrismaWorkerPartitionIDEqualsUniqueParam) unique() {} func (tenantWithPrismaWorkerPartitionIDEqualsUniqueParam) equals() {} +type TenantWithPrismaDataRetentionPeriodEqualsSetParam interface { + field() builder.Field + getQuery() builder.Query + equals() + tenantModel() + dataRetentionPeriodField() +} + +type TenantWithPrismaDataRetentionPeriodSetParam interface { + field() builder.Field + getQuery() builder.Query + tenantModel() + dataRetentionPeriodField() +} + +type tenantWithPrismaDataRetentionPeriodSetParam struct { + data builder.Field + query builder.Query +} + +func (p tenantWithPrismaDataRetentionPeriodSetParam) field() builder.Field { + return p.data +} + +func (p tenantWithPrismaDataRetentionPeriodSetParam) getQuery() builder.Query { + return p.query +} + +func (p tenantWithPrismaDataRetentionPeriodSetParam) tenantModel() {} + +func (p tenantWithPrismaDataRetentionPeriodSetParam) dataRetentionPeriodField() {} + +type TenantWithPrismaDataRetentionPeriodWhereParam interface { + field() builder.Field + getQuery() builder.Query + tenantModel() + dataRetentionPeriodField() +} + +type tenantWithPrismaDataRetentionPeriodEqualsParam struct { + data builder.Field + query builder.Query +} + +func (p tenantWithPrismaDataRetentionPeriodEqualsParam) field() builder.Field { + return p.data +} + +func (p tenantWithPrismaDataRetentionPeriodEqualsParam) getQuery() builder.Query { + return p.query +} + +func (p tenantWithPrismaDataRetentionPeriodEqualsParam) tenantModel() {} + +func (p tenantWithPrismaDataRetentionPeriodEqualsParam) dataRetentionPeriodField() {} + +func (tenantWithPrismaDataRetentionPeriodSetParam) settable() {} +func (tenantWithPrismaDataRetentionPeriodEqualsParam) equals() {} + +type tenantWithPrismaDataRetentionPeriodEqualsUniqueParam struct { + data builder.Field + query builder.Query +} + +func (p tenantWithPrismaDataRetentionPeriodEqualsUniqueParam) field() builder.Field { + return p.data +} + +func (p tenantWithPrismaDataRetentionPeriodEqualsUniqueParam) getQuery() builder.Query { + return p.query +} + +func (p tenantWithPrismaDataRetentionPeriodEqualsUniqueParam) tenantModel() {} +func (p tenantWithPrismaDataRetentionPeriodEqualsUniqueParam) dataRetentionPeriodField() {} + +func (tenantWithPrismaDataRetentionPeriodEqualsUniqueParam) unique() {} +func (tenantWithPrismaDataRetentionPeriodEqualsUniqueParam) equals() {} + type TenantWithPrismaEventsEqualsSetParam interface { field() builder.Field getQuery() builder.Query diff --git a/pkg/repository/prisma/dbsqlc/events.sql b/pkg/repository/prisma/dbsqlc/events.sql index ae643e9ee..252af8a12 100644 --- a/pkg/repository/prisma/dbsqlc/events.sql +++ b/pkg/repository/prisma/dbsqlc/events.sql @@ -133,3 +133,25 @@ FROM WHERE "tenantId" = @tenantId::uuid AND "id" = ANY (sqlc.arg('ids')::uuid[]); + +-- name: DeleteExpiredEvents :one +WITH expired_events_count AS ( + SELECT COUNT(*) as count + FROM "Event" e1 + WHERE + e1."tenantId" = @tenantId::uuid AND + e1."createdAt" < @createdBefore::timestamp +), expired_events_with_limit AS ( + SELECT + "id" + FROM "Event" e2 + WHERE + e2."tenantId" = @tenantId::uuid AND + e2."createdAt" < @createdBefore::timestamp + ORDER BY "createdAt" ASC + LIMIT sqlc.arg('limit') +) +DELETE FROM "Event" +WHERE + "id" IN (SELECT "id" FROM expired_events_with_limit) +RETURNING (SELECT count FROM expired_events_count) as total, (SELECT count FROM expired_events_count) - (SELECT COUNT(*) FROM expired_events_with_limit) as remaining, (SELECT COUNT(*) FROM expired_events_with_limit) as deleted; diff --git a/pkg/repository/prisma/dbsqlc/events.sql.go b/pkg/repository/prisma/dbsqlc/events.sql.go index 52ce0d20a..3f9cd86b8 100644 --- a/pkg/repository/prisma/dbsqlc/events.sql.go +++ b/pkg/repository/prisma/dbsqlc/events.sql.go @@ -128,6 +128,48 @@ func (q *Queries) CreateEvent(ctx context.Context, db DBTX, arg CreateEventParam return &i, err } +const deleteExpiredEvents = `-- name: DeleteExpiredEvents :one +WITH expired_events_count AS ( + SELECT COUNT(*) as count + FROM "Event" e1 + WHERE + e1."tenantId" = $1::uuid AND + e1."createdAt" < $2::timestamp +), expired_events_with_limit AS ( + SELECT + "id" + FROM "Event" e2 + WHERE + e2."tenantId" = $1::uuid AND + e2."createdAt" < $2::timestamp + ORDER BY "createdAt" ASC + LIMIT $3 +) +DELETE FROM "Event" +WHERE + "id" IN (SELECT "id" FROM expired_events_with_limit) +RETURNING (SELECT count FROM expired_events_count) as total, (SELECT count FROM expired_events_count) - (SELECT COUNT(*) FROM expired_events_with_limit) as remaining, (SELECT COUNT(*) FROM expired_events_with_limit) as deleted +` + +type DeleteExpiredEventsParams struct { + Tenantid pgtype.UUID `json:"tenantid"` + Createdbefore pgtype.Timestamp `json:"createdbefore"` + Limit int32 `json:"limit"` +} + +type DeleteExpiredEventsRow struct { + Total int64 `json:"total"` + Remaining int32 `json:"remaining"` + Deleted int64 `json:"deleted"` +} + +func (q *Queries) DeleteExpiredEvents(ctx context.Context, db DBTX, arg DeleteExpiredEventsParams) (*DeleteExpiredEventsRow, error) { + row := db.QueryRow(ctx, deleteExpiredEvents, arg.Tenantid, arg.Createdbefore, arg.Limit) + var i DeleteExpiredEventsRow + err := row.Scan(&i.Total, &i.Remaining, &i.Deleted) + return &i, err +} + const getEventForEngine = `-- name: GetEventForEngine :one SELECT id, "createdAt", "updatedAt", "deletedAt", key, "tenantId", "replayedFromId", data, "additionalMetadata" diff --git a/pkg/repository/prisma/dbsqlc/models.go b/pkg/repository/prisma/dbsqlc/models.go index e016ebdb5..d3a28b1d6 100644 --- a/pkg/repository/prisma/dbsqlc/models.go +++ b/pkg/repository/prisma/dbsqlc/models.go @@ -876,6 +876,7 @@ type Tenant struct { AlertMemberEmails bool `json:"alertMemberEmails"` ControllerPartitionId pgtype.Text `json:"controllerPartitionId"` WorkerPartitionId pgtype.Text `json:"workerPartitionId"` + DataRetentionPeriod string `json:"dataRetentionPeriod"` } type TenantAlertEmailGroup struct { diff --git a/pkg/repository/prisma/dbsqlc/schema.sql b/pkg/repository/prisma/dbsqlc/schema.sql index 1db0bbc47..c970d7b40 100644 --- a/pkg/repository/prisma/dbsqlc/schema.sql +++ b/pkg/repository/prisma/dbsqlc/schema.sql @@ -362,6 +362,7 @@ CREATE TABLE "Tenant" ( "alertMemberEmails" BOOLEAN NOT NULL DEFAULT true, "controllerPartitionId" TEXT, "workerPartitionId" TEXT, + "dataRetentionPeriod" TEXT NOT NULL DEFAULT '720h', CONSTRAINT "Tenant_pkey" PRIMARY KEY ("id") ); diff --git a/pkg/repository/prisma/dbsqlc/tenants.sql b/pkg/repository/prisma/dbsqlc/tenants.sql index 084308b24..c02a60a4a 100644 --- a/pkg/repository/prisma/dbsqlc/tenants.sql +++ b/pkg/repository/prisma/dbsqlc/tenants.sql @@ -7,7 +7,7 @@ WITH active_controller_partitions AS ( WHERE "lastHeartbeat" > NOW() - INTERVAL '1 minute' ) -INSERT INTO "Tenant" ("id", "name", "slug", "controllerPartitionId") +INSERT INTO "Tenant" ("id", "name", "slug", "controllerPartitionId", "dataRetentionPeriod") VALUES ( sqlc.arg('id')::uuid, sqlc.arg('name')::text, @@ -20,7 +20,8 @@ VALUES ( ORDER BY random() LIMIT 1 - ) + ), + COALESCE(sqlc.narg('dataRetentionPeriod')::text, '720h') ) RETURNING *; diff --git a/pkg/repository/prisma/dbsqlc/tenants.sql.go b/pkg/repository/prisma/dbsqlc/tenants.sql.go index c1c28731f..1dad7b6b6 100644 --- a/pkg/repository/prisma/dbsqlc/tenants.sql.go +++ b/pkg/repository/prisma/dbsqlc/tenants.sql.go @@ -39,7 +39,7 @@ WITH active_controller_partitions AS ( WHERE "lastHeartbeat" > NOW() - INTERVAL '1 minute' ) -INSERT INTO "Tenant" ("id", "name", "slug", "controllerPartitionId") +INSERT INTO "Tenant" ("id", "name", "slug", "controllerPartitionId", "dataRetentionPeriod") VALUES ( $1::uuid, $2::text, @@ -52,19 +52,26 @@ VALUES ( ORDER BY random() LIMIT 1 - ) + ), + COALESCE($4::text, '720h') ) -RETURNING id, "createdAt", "updatedAt", "deletedAt", name, slug, "analyticsOptOut", "alertMemberEmails", "controllerPartitionId", "workerPartitionId" +RETURNING id, "createdAt", "updatedAt", "deletedAt", name, slug, "analyticsOptOut", "alertMemberEmails", "controllerPartitionId", "workerPartitionId", "dataRetentionPeriod" ` type CreateTenantParams struct { - ID pgtype.UUID `json:"id"` - Name string `json:"name"` - Slug string `json:"slug"` + ID pgtype.UUID `json:"id"` + Name string `json:"name"` + Slug string `json:"slug"` + DataRetentionPeriod pgtype.Text `json:"dataRetentionPeriod"` } func (q *Queries) CreateTenant(ctx context.Context, db DBTX, arg CreateTenantParams) (*Tenant, error) { - row := db.QueryRow(ctx, createTenant, arg.ID, arg.Name, arg.Slug) + row := db.QueryRow(ctx, createTenant, + arg.ID, + arg.Name, + arg.Slug, + arg.DataRetentionPeriod, + ) var i Tenant err := row.Scan( &i.ID, @@ -77,6 +84,7 @@ func (q *Queries) CreateTenant(ctx context.Context, db DBTX, arg CreateTenantPar &i.AlertMemberEmails, &i.ControllerPartitionId, &i.WorkerPartitionId, + &i.DataRetentionPeriod, ) return &i, err } @@ -295,7 +303,7 @@ func (q *Queries) GetTenantAlertingSettings(ctx context.Context, db DBTX, tenant const getTenantByID = `-- name: GetTenantByID :one SELECT - id, "createdAt", "updatedAt", "deletedAt", name, slug, "analyticsOptOut", "alertMemberEmails", "controllerPartitionId", "workerPartitionId" + id, "createdAt", "updatedAt", "deletedAt", name, slug, "analyticsOptOut", "alertMemberEmails", "controllerPartitionId", "workerPartitionId", "dataRetentionPeriod" FROM "Tenant" as tenants WHERE @@ -316,6 +324,7 @@ func (q *Queries) GetTenantByID(ctx context.Context, db DBTX, id pgtype.UUID) (* &i.AlertMemberEmails, &i.ControllerPartitionId, &i.WorkerPartitionId, + &i.DataRetentionPeriod, ) return &i, err } @@ -458,7 +467,7 @@ func (q *Queries) GetTenantWorkflowQueueMetrics(ctx context.Context, db DBTX, ar const listTenants = `-- name: ListTenants :many SELECT - id, "createdAt", "updatedAt", "deletedAt", name, slug, "analyticsOptOut", "alertMemberEmails", "controllerPartitionId", "workerPartitionId" + id, "createdAt", "updatedAt", "deletedAt", name, slug, "analyticsOptOut", "alertMemberEmails", "controllerPartitionId", "workerPartitionId", "dataRetentionPeriod" FROM "Tenant" as tenants ` @@ -483,6 +492,7 @@ func (q *Queries) ListTenants(ctx context.Context, db DBTX) ([]*Tenant, error) { &i.AlertMemberEmails, &i.ControllerPartitionId, &i.WorkerPartitionId, + &i.DataRetentionPeriod, ); err != nil { return nil, err } @@ -504,7 +514,7 @@ WITH update_partition AS ( "id" = $1::text ) SELECT - id, "createdAt", "updatedAt", "deletedAt", name, slug, "analyticsOptOut", "alertMemberEmails", "controllerPartitionId", "workerPartitionId" + id, "createdAt", "updatedAt", "deletedAt", name, slug, "analyticsOptOut", "alertMemberEmails", "controllerPartitionId", "workerPartitionId", "dataRetentionPeriod" FROM "Tenant" as tenants WHERE @@ -531,6 +541,7 @@ func (q *Queries) ListTenantsByControllerPartitionId(ctx context.Context, db DBT &i.AlertMemberEmails, &i.ControllerPartitionId, &i.WorkerPartitionId, + &i.DataRetentionPeriod, ); err != nil { return nil, err } @@ -552,7 +563,7 @@ WITH update_partition AS ( "id" = $1::text ) SELECT - id, "createdAt", "updatedAt", "deletedAt", name, slug, "analyticsOptOut", "alertMemberEmails", "controllerPartitionId", "workerPartitionId" + id, "createdAt", "updatedAt", "deletedAt", name, slug, "analyticsOptOut", "alertMemberEmails", "controllerPartitionId", "workerPartitionId", "dataRetentionPeriod" FROM "Tenant" as tenants WHERE @@ -579,6 +590,7 @@ func (q *Queries) ListTenantsByTenantWorkerPartitionId(ctx context.Context, db D &i.AlertMemberEmails, &i.ControllerPartitionId, &i.WorkerPartitionId, + &i.DataRetentionPeriod, ); err != nil { return nil, err } @@ -617,7 +629,7 @@ WHERE "controllerPartitionId" IS NULL OR "controllerPartitionId" NOT IN (SELECT "id" FROM active_partitions) ) -RETURNING id, "createdAt", "updatedAt", "deletedAt", name, slug, "analyticsOptOut", "alertMemberEmails", "controllerPartitionId", "workerPartitionId" +RETURNING id, "createdAt", "updatedAt", "deletedAt", name, slug, "analyticsOptOut", "alertMemberEmails", "controllerPartitionId", "workerPartitionId", "dataRetentionPeriod" ` func (q *Queries) RebalanceAllControllerPartitions(ctx context.Context, db DBTX) error { @@ -652,7 +664,7 @@ WHERE "workerPartitionId" IS NULL OR "workerPartitionId" NOT IN (SELECT "id" FROM active_partitions) ) -RETURNING id, "createdAt", "updatedAt", "deletedAt", name, slug, "analyticsOptOut", "alertMemberEmails", "controllerPartitionId", "workerPartitionId" +RETURNING id, "createdAt", "updatedAt", "deletedAt", name, slug, "analyticsOptOut", "alertMemberEmails", "controllerPartitionId", "workerPartitionId", "dataRetentionPeriod" ` func (q *Queries) RebalanceAllTenantWorkerPartitions(ctx context.Context, db DBTX) error { diff --git a/pkg/repository/prisma/dbsqlc/workflow_runs.sql b/pkg/repository/prisma/dbsqlc/workflow_runs.sql index 4744914a0..dedfec05f 100644 --- a/pkg/repository/prisma/dbsqlc/workflow_runs.sql +++ b/pkg/repository/prisma/dbsqlc/workflow_runs.sql @@ -599,3 +599,27 @@ WHERE (sqlc.narg('childKey')::text IS NULL AND "childIndex" = @childIndex) OR (sqlc.narg('childKey')::text IS NOT NULL AND "childKey" = sqlc.narg('childKey')::text) ); + +-- name: DeleteExpiredWorkflowRuns :one +WITH expired_runs_count AS ( + SELECT COUNT(*) as count + FROM "WorkflowRun" wr1 + WHERE + wr1."tenantId" = @tenantId::uuid AND + wr1."status" = ANY(cast(sqlc.narg('statuses')::text[] as "WorkflowRunStatus"[])) AND + wr1."createdAt" < @createdBefore::timestamp +), expired_runs_with_limit AS ( + SELECT + "id" + FROM "WorkflowRun" wr2 + WHERE + wr2."tenantId" = @tenantId::uuid AND + wr2."status" = ANY(cast(sqlc.narg('statuses')::text[] as "WorkflowRunStatus"[])) AND + wr2."createdAt" < @createdBefore::timestamp + ORDER BY "createdAt" ASC + LIMIT sqlc.arg('limit') +) +DELETE FROM "WorkflowRun" +WHERE + "id" IN (SELECT "id" FROM expired_runs_with_limit) +RETURNING (SELECT count FROM expired_runs_count) as total, (SELECT count FROM expired_runs_count) - (SELECT COUNT(*) FROM expired_runs_with_limit) as remaining, (SELECT COUNT(*) FROM expired_runs_with_limit) as deleted; diff --git a/pkg/repository/prisma/dbsqlc/workflow_runs.sql.go b/pkg/repository/prisma/dbsqlc/workflow_runs.sql.go index 603633854..c7c171fd2 100644 --- a/pkg/repository/prisma/dbsqlc/workflow_runs.sql.go +++ b/pkg/repository/prisma/dbsqlc/workflow_runs.sql.go @@ -490,6 +490,56 @@ func (q *Queries) CreateWorkflowRunTriggeredBy(ctx context.Context, db DBTX, arg return &i, err } +const deleteExpiredWorkflowRuns = `-- name: DeleteExpiredWorkflowRuns :one +WITH expired_runs_count AS ( + SELECT COUNT(*) as count + FROM "WorkflowRun" wr1 + WHERE + wr1."tenantId" = $1::uuid AND + wr1."status" = ANY(cast($2::text[] as "WorkflowRunStatus"[])) AND + wr1."createdAt" < $3::timestamp +), expired_runs_with_limit AS ( + SELECT + "id" + FROM "WorkflowRun" wr2 + WHERE + wr2."tenantId" = $1::uuid AND + wr2."status" = ANY(cast($2::text[] as "WorkflowRunStatus"[])) AND + wr2."createdAt" < $3::timestamp + ORDER BY "createdAt" ASC + LIMIT $4 +) +DELETE FROM "WorkflowRun" +WHERE + "id" IN (SELECT "id" FROM expired_runs_with_limit) +RETURNING (SELECT count FROM expired_runs_count) as total, (SELECT count FROM expired_runs_count) - (SELECT COUNT(*) FROM expired_runs_with_limit) as remaining, (SELECT COUNT(*) FROM expired_runs_with_limit) as deleted +` + +type DeleteExpiredWorkflowRunsParams struct { + Tenantid pgtype.UUID `json:"tenantid"` + Statuses []string `json:"statuses"` + Createdbefore pgtype.Timestamp `json:"createdbefore"` + Limit int32 `json:"limit"` +} + +type DeleteExpiredWorkflowRunsRow struct { + Total int64 `json:"total"` + Remaining int32 `json:"remaining"` + Deleted int64 `json:"deleted"` +} + +func (q *Queries) DeleteExpiredWorkflowRuns(ctx context.Context, db DBTX, arg DeleteExpiredWorkflowRunsParams) (*DeleteExpiredWorkflowRunsRow, error) { + row := db.QueryRow(ctx, deleteExpiredWorkflowRuns, + arg.Tenantid, + arg.Statuses, + arg.Createdbefore, + arg.Limit, + ) + var i DeleteExpiredWorkflowRunsRow + err := row.Scan(&i.Total, &i.Remaining, &i.Deleted) + return &i, err +} + const getChildWorkflowRun = `-- name: GetChildWorkflowRun :one SELECT "createdAt", "updatedAt", "deletedAt", "tenantId", "workflowVersionId", status, error, "startedAt", "finishedAt", "concurrencyGroupId", "displayName", id, "childIndex", "childKey", "parentId", "parentStepRunId", "additionalMetadata" diff --git a/pkg/repository/prisma/event.go b/pkg/repository/prisma/event.go index 25a5946cb..6c3ecdfc3 100644 --- a/pkg/repository/prisma/event.go +++ b/pkg/repository/prisma/event.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "time" "github.com/google/uuid" "github.com/jackc/pgx/v5" @@ -285,3 +286,21 @@ func (r *eventEngineRepository) ListEventsByIds(ctx context.Context, tenantId st Ids: pgIds, }) } + +func (r *eventEngineRepository) DeleteExpiredEvents(ctx context.Context, tenantId string, before time.Time) (int, int, error) { + resp, err := r.queries.DeleteExpiredEvents(ctx, r.pool, dbsqlc.DeleteExpiredEventsParams{ + Tenantid: sqlchelpers.UUIDFromStr(tenantId), + Createdbefore: sqlchelpers.TimestampFromTime(before), + Limit: 1000, + }) + + if err != nil { + if errors.Is(err, pgx.ErrNoRows) { + return 0, 0, nil + } + + return 0, 0, err + } + + return int(resp.Deleted), int(resp.Remaining), nil +} diff --git a/pkg/repository/prisma/tenant.go b/pkg/repository/prisma/tenant.go index 3fcadba84..369d1dfba 100644 --- a/pkg/repository/prisma/tenant.go +++ b/pkg/repository/prisma/tenant.go @@ -51,6 +51,12 @@ func (r *tenantAPIRepository) CreateTenant(opts *repository.CreateTenantOpts) (* tenantId = *opts.ID } + var dataRetentionPeriod pgtype.Text + + if opts.DataRetentionPeriod != nil { + dataRetentionPeriod = sqlchelpers.TextFromStr(*opts.DataRetentionPeriod) + } + tx, err := r.pool.Begin(context.Background()) if err != nil { @@ -60,9 +66,10 @@ func (r *tenantAPIRepository) CreateTenant(opts *repository.CreateTenantOpts) (* defer deferRollback(context.Background(), r.l, tx.Rollback) createTenant, err := r.queries.CreateTenant(context.Background(), tx, dbsqlc.CreateTenantParams{ - ID: sqlchelpers.UUIDFromStr(tenantId), - Slug: opts.Slug, - Name: opts.Name, + ID: sqlchelpers.UUIDFromStr(tenantId), + Slug: opts.Slug, + Name: opts.Name, + DataRetentionPeriod: dataRetentionPeriod, }) if err != nil { diff --git a/pkg/repository/prisma/workflow_run.go b/pkg/repository/prisma/workflow_run.go index c58d1fcd9..18afe7438 100644 --- a/pkg/repository/prisma/workflow_run.go +++ b/pkg/repository/prisma/workflow_run.go @@ -242,6 +242,31 @@ func (w *workflowRunEngineRepository) CreateNewWorkflowRun(ctx context.Context, return id, nil } +func (w *workflowRunEngineRepository) DeleteExpiredWorkflowRuns(ctx context.Context, tenantId string, statuses []dbsqlc.WorkflowRunStatus, before time.Time) (int, int, error) { + paramStatuses := make([]string, 0) + + for _, status := range statuses { + paramStatuses = append(paramStatuses, string(status)) + } + + resp, err := w.queries.DeleteExpiredWorkflowRuns(ctx, w.pool, dbsqlc.DeleteExpiredWorkflowRunsParams{ + Tenantid: sqlchelpers.UUIDFromStr(tenantId), + Statuses: paramStatuses, + Createdbefore: sqlchelpers.TimestampFromTime(before), + Limit: 1000, + }) + + if err != nil { + if errors.Is(err, pgx.ErrNoRows) { + return 0, 0, nil + } + + return 0, 0, err + } + + return int(resp.Deleted), int(resp.Remaining), nil +} + func listWorkflowRuns(ctx context.Context, pool *pgxpool.Pool, queries *dbsqlc.Queries, l *zerolog.Logger, tenantId string, opts *repository.ListWorkflowRunsOpts) (*repository.ListWorkflowRunsResult, error) { res := &repository.ListWorkflowRunsResult{} diff --git a/pkg/repository/tenant.go b/pkg/repository/tenant.go index bca61b736..2f7f9ddbb 100644 --- a/pkg/repository/tenant.go +++ b/pkg/repository/tenant.go @@ -16,6 +16,9 @@ type CreateTenantOpts struct { // (optional) the tenant ID ID *string `validate:"omitempty,uuid"` + + // (optional) the tenant data retention period + DataRetentionPeriod *string `validate:"omitempty,duration"` } type UpdateTenantOpts struct { diff --git a/pkg/repository/workflow_run.go b/pkg/repository/workflow_run.go index 7fb1c67a7..2d83e37e7 100644 --- a/pkg/repository/workflow_run.go +++ b/pkg/repository/workflow_run.go @@ -376,4 +376,8 @@ type WorkflowRunEngineRepository interface { // GetWorkflowRunById returns a workflow run by id. GetWorkflowRunById(ctx context.Context, tenantId, runId string) (*dbsqlc.GetWorkflowRunRow, error) + + // DeleteExpiredWorkflowRuns deletes workflow runs that were created before the given time. It returns the number of deleted runs + // and the number of non-deleted runs that match the conditions. + DeleteExpiredWorkflowRuns(ctx context.Context, tenantId string, statuses []dbsqlc.WorkflowRunStatus, before time.Time) (int, int, error) } diff --git a/pkg/validator/default.go b/pkg/validator/default.go index f1a740339..631a9ef6c 100644 --- a/pkg/validator/default.go +++ b/pkg/validator/default.go @@ -18,7 +18,7 @@ const ( HatchetNameErr = "Hatchet names must match the regex ^[a-zA-Z0-9\\.\\-_]+$" ActionIDErr = "Invalid action ID. Action IDs must be in the format :" CronErr = "Invalid cron expression" - DurationErr = "Invalid duration. Durations must be in the format , where unit is one of: 's', 'm', 'h', 'd', 'w', 'M', 'y'" + DurationErr = "Invalid duration. Durations must be in the format , where unit is one of: 's', 'm', 'h'" ) type APIErrors gen.APIErrors diff --git a/prisma/migrations/20240704211308_v0_35_2/migration.sql b/prisma/migrations/20240704211308_v0_35_2/migration.sql new file mode 100644 index 000000000..3683f5d1e --- /dev/null +++ b/prisma/migrations/20240704211308_v0_35_2/migration.sql @@ -0,0 +1,2 @@ +-- AlterTable +ALTER TABLE "Tenant" ADD COLUMN "dataRetentionPeriod" TEXT NOT NULL DEFAULT '720h'; diff --git a/prisma/schema.prisma b/prisma/schema.prisma index d2d430585..8202988b6 100644 --- a/prisma/schema.prisma +++ b/prisma/schema.prisma @@ -171,6 +171,9 @@ model Tenant { workerPartition TenantWorkerPartition? @relation(fields: [workerPartitionId], references: [id], onDelete: SetNull, onUpdate: SetNull) workerPartitionId String? + // The data retention period for deletable resources. This is a Go duration string. + dataRetentionPeriod String @default("720h") + events Event[] workflows Workflow[] jobs Job[] diff --git a/sql/migrations/20240703194656_v0.35.1.sql b/sql/migrations/20240703194656_v0.35.1.sql index 9b4e7f02b..9e4cb5dbf 100644 --- a/sql/migrations/20240703194656_v0.35.1.sql +++ b/sql/migrations/20240703194656_v0.35.1.sql @@ -1,24 +1,26 @@ +-- atlas:txmode none + -- Create index "Event_createdAt_idx" to table: "Event" -CREATE INDEX "Event_createdAt_idx" ON "Event" ("createdAt"); +CREATE INDEX CONCURRENTLY IF NOT EXISTS "Event_createdAt_idx" ON "Event" ("createdAt"); -- Create index "Event_tenantId_createdAt_idx" to table: "Event" -CREATE INDEX "Event_tenantId_createdAt_idx" ON "Event" ("tenantId", "createdAt"); +CREATE INDEX CONCURRENTLY IF NOT EXISTS "Event_tenantId_createdAt_idx" ON "Event" ("tenantId", "createdAt"); -- Create index "Event_tenantId_idx" to table: "Event" -CREATE INDEX "Event_tenantId_idx" ON "Event" ("tenantId"); +CREATE INDEX CONCURRENTLY IF NOT EXISTS "Event_tenantId_idx" ON "Event" ("tenantId"); -- Create index "WorkflowRun_createdAt_idx" to table: "WorkflowRun" -CREATE INDEX "WorkflowRun_createdAt_idx" ON "WorkflowRun" ("createdAt"); +CREATE INDEX CONCURRENTLY IF NOT EXISTS "WorkflowRun_createdAt_idx" ON "WorkflowRun" ("createdAt"); -- Create index "WorkflowRun_finishedAt_idx" to table: "WorkflowRun" -CREATE INDEX "WorkflowRun_finishedAt_idx" ON "WorkflowRun" ("finishedAt"); +CREATE INDEX CONCURRENTLY IF NOT EXISTS "WorkflowRun_finishedAt_idx" ON "WorkflowRun" ("finishedAt"); -- Create index "WorkflowRun_status_idx" to table: "WorkflowRun" -CREATE INDEX "WorkflowRun_status_idx" ON "WorkflowRun" ("status"); +CREATE INDEX CONCURRENTLY IF NOT EXISTS "WorkflowRun_status_idx" ON "WorkflowRun" ("status"); -- Create index "WorkflowRun_tenantId_createdAt_idx" to table: "WorkflowRun" -CREATE INDEX "WorkflowRun_tenantId_createdAt_idx" ON "WorkflowRun" ("tenantId", "createdAt"); +CREATE INDEX CONCURRENTLY IF NOT EXISTS "WorkflowRun_tenantId_createdAt_idx" ON "WorkflowRun" ("tenantId", "createdAt"); -- Create index "WorkflowRun_tenantId_idx" to table: "WorkflowRun" -CREATE INDEX "WorkflowRun_tenantId_idx" ON "WorkflowRun" ("tenantId"); +CREATE INDEX CONCURRENTLY IF NOT EXISTS "WorkflowRun_tenantId_idx" ON "WorkflowRun" ("tenantId"); -- Create index "WorkflowRun_workflowVersionId_idx" to table: "WorkflowRun" -CREATE INDEX "WorkflowRun_workflowVersionId_idx" ON "WorkflowRun" ("workflowVersionId"); +CREATE INDEX CONCURRENTLY IF NOT EXISTS "WorkflowRun_workflowVersionId_idx" ON "WorkflowRun" ("workflowVersionId"); -- Create index "WorkflowRunTriggeredBy_eventId_idx" to table: "WorkflowRunTriggeredBy" -CREATE INDEX "WorkflowRunTriggeredBy_eventId_idx" ON "WorkflowRunTriggeredBy" ("eventId"); +CREATE INDEX CONCURRENTLY IF NOT EXISTS "WorkflowRunTriggeredBy_eventId_idx" ON "WorkflowRunTriggeredBy" ("eventId"); -- Create index "WorkflowRunTriggeredBy_parentId_idx" to table: "WorkflowRunTriggeredBy" -CREATE INDEX "WorkflowRunTriggeredBy_parentId_idx" ON "WorkflowRunTriggeredBy" ("parentId"); +CREATE INDEX CONCURRENTLY IF NOT EXISTS "WorkflowRunTriggeredBy_parentId_idx" ON "WorkflowRunTriggeredBy" ("parentId"); -- Create index "WorkflowRunTriggeredBy_tenantId_idx" to table: "WorkflowRunTriggeredBy" -CREATE INDEX "WorkflowRunTriggeredBy_tenantId_idx" ON "WorkflowRunTriggeredBy" ("tenantId"); +CREATE INDEX CONCURRENTLY IF NOT EXISTS "WorkflowRunTriggeredBy_tenantId_idx" ON "WorkflowRunTriggeredBy" ("tenantId"); diff --git a/sql/migrations/20240704211315_v0.35.2.sql b/sql/migrations/20240704211315_v0.35.2.sql new file mode 100644 index 000000000..7ca8caeb7 --- /dev/null +++ b/sql/migrations/20240704211315_v0.35.2.sql @@ -0,0 +1,2 @@ +-- Modify "Tenant" table +ALTER TABLE "Tenant" ADD COLUMN "dataRetentionPeriod" text NOT NULL DEFAULT '720h'; diff --git a/sql/migrations/atlas.sum b/sql/migrations/atlas.sum index d73395b73..2e04f8860 100644 --- a/sql/migrations/atlas.sum +++ b/sql/migrations/atlas.sum @@ -1,4 +1,4 @@ -h1:hAw5/ZClxRS7GL8N4lsT5pu8L8S+Akh6+Jlm0fVML5k= +h1:0tkfIj+EH3zq3hwM1t4wJ5wHmGjmD9NOMvWB2fElF/g= 20240115180414_init.sql h1:Ef3ZyjAHkmJPdGF/dEWCahbwgcg6uGJKnDxW2JCRi2k= 20240122014727_v0_6_0.sql h1:o/LdlteAeFgoHJ3e/M4Xnghqt9826IE/Y/h0q95Acuo= 20240126235456_v0_7_0.sql h1:KiVzt/hXgQ6esbdC6OMJOOWuYEXmy1yeCpmsVAHTFKs= @@ -35,4 +35,5 @@ h1:hAw5/ZClxRS7GL8N4lsT5pu8L8S+Akh6+Jlm0fVML5k= 20240625180548_v0.34.0.sql h1:77uSk0VF/jBvEPHCqWC4hmMQqUx4zVnMdTryGsIXt9s= 20240626204339_v0.34.2.sql h1:e2hArnEfcEYcBjEPxZW3axkl4CGt2lHa1oIA2r2fjfY= 20240701144852_v0_35_0.sql h1:q8pPeq4LZp7hxZZp4P08xctwAdQFKDEA9vbj1Ulbn7U= -20240703194656_v0.35.1.sql h1:2plFt+n4AhwJqEICU20O9nHnSVDBubZxg8Rnr3eZD+Y= +20240703194656_v0.35.1.sql h1:wg/DWVOmWy7UiXrimlnwongTcT0aJa4pYSOYkiREgNg= +20240704211315_v0.35.2.sql h1:/AzVYp+jzwPGx8JHUCPjBi2CnXmFvtsTWL3SgrC49IE= diff --git a/sql/schema/schema.sql b/sql/schema/schema.sql index 1db0bbc47..c970d7b40 100644 --- a/sql/schema/schema.sql +++ b/sql/schema/schema.sql @@ -362,6 +362,7 @@ CREATE TABLE "Tenant" ( "alertMemberEmails" BOOLEAN NOT NULL DEFAULT true, "controllerPartitionId" TEXT, "workerPartitionId" TEXT, + "dataRetentionPeriod" TEXT NOT NULL DEFAULT '720h', CONSTRAINT "Tenant_pkey" PRIMARY KEY ("id") );