feat: configurable data retention period (#693)

* feat: data retention for tenants

* chore: generate and docs

* chore: lint
This commit is contained in:
abelanger5
2024-07-06 10:31:12 -04:00
committed by GitHub
parent 2b44f6fab6
commit f36e66cd28
31 changed files with 977 additions and 81 deletions

View File

@@ -66,7 +66,6 @@ tasks:
- task: generate-sqlc
seed-dev:
cmds:
- sh ./hack/dev/run-go-with-env.sh run github.com/steebchen/prisma-client-go migrate dev --skip-generate
- SEED_DEVELOPMENT=true sh ./hack/dev/run-go-with-env.sh run ./cmd/hatchet-admin seed
start-dev:
deps:

View File

@@ -49,6 +49,10 @@ func (t *TenantService) TenantCreate(ctx echo.Context, request gen.TenantCreateR
Name: request.Body.Name,
}
if t.config.Runtime.Limits.DefaultTenantRetentionPeriod != "" {
createOpts.DataRetentionPeriod = &t.config.Runtime.Limits.DefaultTenantRetentionPeriod
}
// write the user to the db
tenant, err := t.config.APIRepository.Tenant().CreateTenant(createOpts)

View File

@@ -1,21 +1,22 @@
{
"index": "Introduction",
"-- Docker": {
"type": "separator",
"title": "Docker"
},
"hatchet-lite": "Hatchet Lite",
"docker-compose": "Docker Compose",
"-- Kubernetes": {
"type": "separator",
"title": "Kubernetes"
},
"kubernetes-quickstart": "Quickstart",
"kubernetes-glasskube": "Installing with Glasskube",
"networking": "Networking",
"-- Managing Hatchet": {
"type": "separator",
"title": "Managing Hatchet"
},
"configuration-options": "Configuration Options"
"index": "Introduction",
"-- Docker": {
"type": "separator",
"title": "Docker"
},
"hatchet-lite": "Hatchet Lite",
"docker-compose": "Docker Compose",
"-- Kubernetes": {
"type": "separator",
"title": "Kubernetes"
},
"kubernetes-quickstart": "Quickstart",
"kubernetes-glasskube": "Installing with Glasskube",
"networking": "Networking",
"-- Managing Hatchet": {
"type": "separator",
"title": "Managing Hatchet"
},
"configuration-options": "Configuration Options",
"data-retention": "Data Retention"
}

View File

@@ -1,8 +1,6 @@
# Configuration Options
The Hatchet server and engine can be configured via `HATCHET_SERVER` environment variables. This document contains a list of all available options.
This document outlines the environment variables used to configure the server. These variables are grouped based on the configuration sections they belong to.
The Hatchet server and engine can be configured via `SERVER` and `DATABASE` environment variables. This document contains a list of all available options.
## Runtime Configuration
@@ -14,13 +12,59 @@ This document outlines the environment variables used to configure the server. T
| `SERVER_GRPC_BIND_ADDRESS` | GRPC server bind address | `127.0.0.1` |
| `SERVER_GRPC_BROADCAST_ADDRESS` | GRPC server broadcast address | `127.0.0.1:7070` |
| `SERVER_GRPC_INSECURE` | Controls if the GRPC server is insecure | `false` |
| `SERVER_WORKER_ENABLED` | Whether the internal worker is enabled | `false` |
| `SERVER_SHUTDOWN_WAIT` | Shutdown wait duration | `20s` |
| `SERVER_ENFORCE_LIMITS` | Enforce tenant limits | `false` |
| `SERVER_ALLOW_SIGNUP` | Allow new tenant signups | `true` |
| `SERVER_ALLOW_INVITES` | Allow new invites | `true` |
| `SERVER_ALLOW_CREATE_TENANT` | Allow tenant creation | `true` |
| `SERVER_ALLOW_CHANGE_PASSWORD` | Allow password changes | `true` |
## Services Configuration
## Database Configuration
| Variable | Description | Default Value |
| ----------------- | ------------------------ | ------------------------------------------------------------------------------------------------ |
| `SERVER_SERVICES` | List of enabled services | `["health", "ticker", "grpc", "eventscontroller", "queue", "webhookscontroller", "heartbeater"]` |
| Variable | Description | Default Value |
| ---------------------------- | ------------------------ | ------------- |
| `DATABASE_POSTGRES_HOST` | PostgreSQL host | `127.0.0.1` |
| `DATABASE_POSTGRES_PORT` | PostgreSQL port | `5431` |
| `DATABASE_POSTGRES_USERNAME` | PostgreSQL username | `hatchet` |
| `DATABASE_POSTGRES_PASSWORD` | PostgreSQL password | `hatchet` |
| `DATABASE_POSTGRES_DB_NAME` | PostgreSQL database name | `hatchet` |
| `DATABASE_POSTGRES_SSL_MODE` | PostgreSQL SSL mode | `disable` |
| `DATABASE_MAX_CONNS` | Max database connections | `5` |
| `DATABASE_LOG_QUERIES` | Log database queries | `false` |
| `CACHE_DURATION` | Cache duration | `60s` |
## Security Check Configuration
| Variable | Description | Default Value |
| -------------------------------- | ----------------------- | ------------------------------ |
| `SERVER_SECURITY_CHECK_ENABLED` | Enable security check | `true` |
| `SERVER_SECURITY_CHECK_ENDPOINT` | Security check endpoint | `https://security.hatchet.run` |
## Limit Configuration
| Variable | Description | Default Value |
| ------------------------------------------------ | -------------------------------- | ------------- |
| `SERVER_LIMITS_DEFAULT_TENANT_RETENTION_PERIOD` | Default tenant retention period | `720h` |
| `SERVER_LIMITS_DEFAULT_WORKFLOW_RUN_LIMIT` | Default workflow run limit | `1000` |
| `SERVER_LIMITS_DEFAULT_WORKFLOW_RUN_ALARM_LIMIT` | Default workflow run alarm limit | `750` |
| `SERVER_LIMITS_DEFAULT_WORKFLOW_RUN_WINDOW` | Default workflow run window | `24h` |
| `SERVER_LIMITS_DEFAULT_WORKER_LIMIT` | Default worker limit | `4` |
| `SERVER_LIMITS_DEFAULT_WORKER_ALARM_LIMIT` | Default worker alarm limit | `2` |
| `SERVER_LIMITS_DEFAULT_EVENT_LIMIT` | Default event limit | `1000` |
| `SERVER_LIMITS_DEFAULT_EVENT_ALARM_LIMIT` | Default event alarm limit | `750` |
| `SERVER_LIMITS_DEFAULT_EVENT_WINDOW` | Default event window | `24h` |
| `SERVER_LIMITS_DEFAULT_CRON_LIMIT` | Default cron limit | `5` |
| `SERVER_LIMITS_DEFAULT_CRON_ALARM_LIMIT` | Default cron alarm limit | `2` |
| `SERVER_LIMITS_DEFAULT_SCHEDULE_LIMIT` | Default schedule limit | `1000` |
| `SERVER_LIMITS_DEFAULT_SCHEDULE_ALARM_LIMIT` | Default schedule alarm limit | `750` |
## Alerting Configuration
| Variable | Description | Default Value |
| ------------------------------------ | -------------------------- | ------------- |
| `SERVER_ALERTING_SENTRY_ENABLED` | Enable Sentry for alerting | |
| `SERVER_ALERTING_SENTRY_DSN` | Sentry DSN | |
| `SERVER_ALERTING_SENTRY_ENVIRONMENT` | Sentry environment | `development` |
## Encryption Configuration
@@ -51,12 +95,17 @@ This document outlines the environment variables used to configure the server. T
| `SERVER_AUTH_GOOGLE_CLIENT_ID` | Google auth client ID | |
| `SERVER_AUTH_GOOGLE_CLIENT_SECRET` | Google auth client secret | |
| `SERVER_AUTH_GOOGLE_SCOPES` | Google auth scopes | `["openid", "profile", "email"]` |
| `SERVER_AUTH_GITHUB_ENABLED` | Whether GitHub auth is enabled | `false` |
| `SERVER_AUTH_GITHUB_CLIENT_ID` | GitHub auth client ID | |
| `SERVER_AUTH_GITHUB_CLIENT_SECRET` | GitHub auth client secret | |
| `SERVER_AUTH_GITHUB_SCOPES` | GitHub auth scopes | `["read:user", "user:email"]` |
## Task Queue Configuration
| Variable | Description | Default Value |
| ------------------------------- | ------------ | -------------------------------------- |
| `SERVER_TASKQUEUE_RABBITMQ_URL` | RabbitMQ URL | `amqp://user:password@localhost:5672/` |
| Variable | Description | Default Value |
| ------------------------------ | ------------------ | -------------------------------------- |
| `SERVER_MSGQUEUE_KIND` | Message queue kind | |
| `SERVER_MSGQUEUE_RABBITMQ_URL` | RabbitMQ URL | `amqp://user:password@localhost:5672/` |
## TLS Configuration
@@ -73,10 +122,12 @@ This document outlines the environment variables used to configure the server. T
## Logging Configuration
| Variable | Description | Default Value |
| ---------------------- | ------------- | ------------- |
| `SERVER_LOGGER_LEVEL` | Logger level | |
| `SERVER_LOGGER_FORMAT` | Logger format | |
| Variable | Description | Default Value |
| ------------------------ | ------------- | ------------- |
| `SERVER_LOGGER_LEVEL` | Logger level | |
| `SERVER_LOGGER_FORMAT` | Logger format | |
| `DATABASE_LOGGER_LEVEL` | Logger level | |
| `DATABASE_LOGGER_FORMAT` | Logger format | |
## OpenTelemetry Configuration
@@ -85,16 +136,11 @@ This document outlines the environment variables used to configure the server. T
| `SERVER_OTEL_SERVICE_NAME` | Service name for OpenTelemetry | |
| `SERVER_OTEL_COLLECTOR_URL` | Collector URL for OpenTelemetry | |
## Version Control System (VCS) Configuration
## Tenant Alerting Configuration
| Variable | Description | Default Value |
| -------------------------------------- | ----------------------------- | ------------- |
| `SERVER_VCS_KIND` | Type of VCS | |
| `SERVER_VCS_GITHUB_ENABLED` | Whether GitHub is enabled | |
| `SERVER_VCS_GITHUB_APP_CLIENT_ID` | GitHub app client ID | |
| `SERVER_VCS_GITHUB_APP_CLIENT_SECRET` | GitHub app client secret | |
| `SERVER_VCS_GITHUB_APP_NAME` | GitHub app name | |
| `SERVER_VCS_GITHUB_APP_WEBHOOK_SECRET` | GitHub app webhook secret | |
| `SERVER_VCS_GITHUB_APP_WEBHOOK_URL` | GitHub app webhook URL | |
| `SERVER_VCS_GITHUB_APP_ID` | GitHub app ID | |
| `SERVER_VCS_GITHUB_APP_SECRET_PATH` | Path to the GitHub app secret | |
| Variable | Description | Default Value |
| -------------------------------------------- | -------------------------------- | ---------------------- |
| `SERVER_TENANT_ALERTING_SLACK_ENABLED` | Enable Slack for tenant alerting | |
| `SERVER_TENANT_ALERTING_SLACK_CLIENT_ID` | Slack client ID | |
| `SERVER_TENANT_ALERTING_SLACK_CLIENT_SECRET` | Slack client secret | |
| `SERVER_TENANT_ALERTING_SLACK_SCOPES` | Slack scopes | `["incoming-webhook"]` |

View File

@@ -0,0 +1,9 @@
# Data Retention
In Hatchet engine version `0.36.0` and above, you can configure the default data retention per tenant for workflow runs and events. The default value is set to 30 days, which means that all workflow runs which were created over 30 days ago and are in a final state (i.e. completed or failed), and all events which were created over 30 days ago, will be deleted.
This can be configured by setting the following environment variable to a Go duration string:
```sh
SERVER_LIMITS_DEFAULT_TENANT_RETENTION_PERIOD=720h # 30 days
```

View File

@@ -685,7 +685,7 @@ func (jc *JobsControllerImpl) runStepRunReassign(ctx context.Context, startedAt
err = g.Wait()
if err != nil {
jc.l.Err(err).Msg("could not run step run requeue")
jc.l.Err(err).Msg("could not run step run reassign")
}
}
}

View File

@@ -182,6 +182,30 @@ func (wc *WorkflowsControllerImpl) Start() (func() error, error) {
return nil, fmt.Errorf("could not schedule get group key run reassign: %w", err)
}
_, err = wc.s.NewJob(
gocron.DurationJob(time.Second*60),
gocron.NewTask(
wc.runDeleteExpiredWorkflowRuns(ctx),
),
)
if err != nil {
cancel()
return nil, fmt.Errorf("could not delete expired workflow runs: %w", err)
}
_, err = wc.s.NewJob(
gocron.DurationJob(time.Second*60),
gocron.NewTask(
wc.runDeleteExpiredEvents(ctx),
),
)
if err != nil {
cancel()
return nil, fmt.Errorf("could not delete expired events: %w", err)
}
wc.s.Start()
f := func(task *msgqueue.Message) error {

View File

@@ -493,6 +493,139 @@ func (ec *WorkflowsControllerImpl) runGetGroupKeyRunReassignTenant(ctx context.C
return g.Wait()
}
func (wc *WorkflowsControllerImpl) runDeleteExpiredWorkflowRuns(ctx context.Context) func() {
return func() {
ctx, cancel := context.WithTimeout(ctx, 60*time.Second)
defer cancel()
wc.l.Debug().Msgf("workflows controller: deleting expired workflow runs")
// list all tenants
tenants, err := wc.repo.Tenant().ListTenantsByControllerPartition(ctx, wc.partitionId)
if err != nil {
wc.l.Err(err).Msg("could not list tenants")
return
}
g := new(errgroup.Group)
for i := range tenants {
index := i
g.Go(func() error {
return wc.runDeleteExpiredWorkflowRunsTenant(ctx, *tenants[index])
})
}
err = g.Wait()
if err != nil {
wc.l.Err(err).Msg("could not run delete expired workflow runs")
}
}
}
func (wc *WorkflowsControllerImpl) runDeleteExpiredWorkflowRunsTenant(ctx context.Context, tenant dbsqlc.Tenant) error {
ctx, span := telemetry.NewSpan(ctx, "delete-expired-workflow-runs")
defer span.End()
tenantId := sqlchelpers.UUIDToStr(tenant.ID)
createdBefore, err := getDataRetentionExpiredTime(tenant.DataRetentionPeriod)
if err != nil {
return fmt.Errorf("could not get data retention expired time: %w", err)
}
// keep deleting until the context is done
for {
select {
case <-ctx.Done():
return nil
default:
}
// delete expired workflow runs
_, remaining, err := wc.repo.WorkflowRun().DeleteExpiredWorkflowRuns(ctx, tenantId, []dbsqlc.WorkflowRunStatus{
dbsqlc.WorkflowRunStatusSUCCEEDED,
dbsqlc.WorkflowRunStatusFAILED,
}, createdBefore)
if err != nil {
return fmt.Errorf("could not delete expired workflow runs: %w", err)
}
if remaining == 0 {
return nil
}
}
}
func (wc *WorkflowsControllerImpl) runDeleteExpiredEvents(ctx context.Context) func() {
return func() {
ctx, cancel := context.WithTimeout(ctx, 60*time.Second)
defer cancel()
wc.l.Debug().Msgf("workflows controller: deleting expired events")
// list all tenants
tenants, err := wc.repo.Tenant().ListTenantsByControllerPartition(ctx, wc.partitionId)
if err != nil {
wc.l.Err(err).Msg("could not list tenants")
return
}
g := new(errgroup.Group)
for i := range tenants {
index := i
g.Go(func() error {
return wc.runDeleteExpiredEventsTenant(ctx, *tenants[index])
})
}
err = g.Wait()
if err != nil {
wc.l.Err(err).Msg("could not run delete expired events")
}
}
}
func (wc *WorkflowsControllerImpl) runDeleteExpiredEventsTenant(ctx context.Context, tenant dbsqlc.Tenant) error {
ctx, span := telemetry.NewSpan(ctx, "delete-expired-events")
defer span.End()
tenantId := sqlchelpers.UUIDToStr(tenant.ID)
createdBefore, err := getDataRetentionExpiredTime(tenant.DataRetentionPeriod)
if err != nil {
return fmt.Errorf("could not get data retention expired time: %w", err)
}
// keep deleting until the context is done
for {
select {
case <-ctx.Done():
return nil
default:
}
// delete expired workflow runs
_, remaining, err := wc.repo.Event().DeleteExpiredEvents(ctx, tenantId, createdBefore)
if err != nil {
return fmt.Errorf("could not delete expired events: %w", err)
}
if remaining == 0 {
return nil
}
}
}
func (wc *WorkflowsControllerImpl) queueByCancelInProgress(ctx context.Context, tenantId, groupKey string, workflowVersion *dbsqlc.GetWorkflowVersionForEngineRow) error {
ctx, span := telemetry.NewSpan(ctx, "queue-by-cancel-in-progress")
defer span.End()
@@ -697,3 +830,13 @@ func getStepRunNotifyCancelTask(tenantId, stepRunId, reason string) *msgqueue.Me
Retries: 3,
}
}
func getDataRetentionExpiredTime(duration string) (time.Time, error) {
d, err := time.ParseDuration(duration)
if err != nil {
return time.Time{}, fmt.Errorf("could not parse duration: %w", err)
}
return time.Now().UTC().Add(-d), nil
}

View File

@@ -100,6 +100,8 @@ type SecurityCheckConfigFile struct {
}
type LimitConfigFile struct {
DefaultTenantRetentionPeriod string `mapstructure:"defaultTenantRetentionPeriod" json:"defaultTenantRetentionPeriod,omitempty" default:"720h"`
DefaultWorkflowRunLimit int `mapstructure:"defaultWorkflowRunLimit" json:"defaultWorkflowRunLimit,omitempty" default:"1000"`
DefaultWorkflowRunAlarmLimit int `mapstructure:"defaultWorkflowRunAlarmLimit" json:"defaultWorkflowRunAlarmLimit,omitempty" default:"750"`
DefaultWorkflowRunWindow time.Duration `mapstructure:"defaultWorkflowRunWindow" json:"defaultWorkflowRunWindow,omitempty" default:"24h"`
@@ -371,6 +373,8 @@ func BindAllEnv(v *viper.Viper) {
_ = v.BindEnv("securityCheck.endpoint", "SERVER_SECURITY_CHECK_ENDPOINT")
// limit options
_ = v.BindEnv("runtime.limits.defaultTenantRetentionPeriod", "SERVER_LIMITS_DEFAULT_TENANT_RETENTION_PERIOD")
_ = v.BindEnv("runtime.limits.defaultWorkflowRunLimit", "SERVER_LIMITS_DEFAULT_WORKFLOW_RUN_LIMIT")
_ = v.BindEnv("runtime.limits.defaultWorkflowRunAlarmLimit", "SERVER_LIMITS_DEFAULT_WORKFLOW_RUN_ALARM_LIMIT")
_ = v.BindEnv("runtime.limits.defaultWorkflowRunWindow", "SERVER_LIMITS_DEFAULT_WORKFLOW_RUN_WINDOW")

View File

@@ -2,6 +2,7 @@ package repository
import (
"context"
"time"
"github.com/hatchet-dev/hatchet/pkg/repository/prisma/db"
"github.com/hatchet-dev/hatchet/pkg/repository/prisma/dbsqlc"
@@ -85,4 +86,8 @@ type EventEngineRepository interface {
GetEventForEngine(ctx context.Context, tenantId, id string) (*dbsqlc.Event, error)
ListEventsByIds(ctx context.Context, tenantId string, ids []string) ([]*dbsqlc.Event, error)
// DeleteExpiredEvents deletes events that were created before the given time. It returns the number of deleted events
// and the number of non-deleted events that match the conditions.
DeleteExpiredEvents(ctx context.Context, tenantId string, before time.Time) (int, int, error)
}

View File

@@ -245,6 +245,9 @@ model Tenant {
workerPartition TenantWorkerPartition? @relation(fields: [workerPartitionId], references: [id], onDelete: SetNull, onUpdate: SetNull)
workerPartitionId String?
// The data retention period for deletable resources. This is a Go duration string.
dataRetentionPeriod String @default("720h")
events Event[]
workflows Workflow[]
jobs Job[]
@@ -2082,6 +2085,7 @@ const (
TenantScalarFieldEnumAnalyticsOptOut TenantScalarFieldEnum = "analyticsOptOut"
TenantScalarFieldEnumControllerPartitionID TenantScalarFieldEnum = "controllerPartitionId"
TenantScalarFieldEnumWorkerPartitionID TenantScalarFieldEnum = "workerPartitionId"
TenantScalarFieldEnumDataRetentionPeriod TenantScalarFieldEnum = "dataRetentionPeriod"
TenantScalarFieldEnumAlertMemberEmails TenantScalarFieldEnum = "alertMemberEmails"
)
@@ -2865,6 +2869,8 @@ const tenantFieldWorkerPartition tenantPrismaFields = "workerPartition"
const tenantFieldWorkerPartitionID tenantPrismaFields = "workerPartitionId"
const tenantFieldDataRetentionPeriod tenantPrismaFields = "dataRetentionPeriod"
const tenantFieldEvents tenantPrismaFields = "events"
const tenantFieldWorkflows tenantPrismaFields = "workflows"
@@ -6829,6 +6835,7 @@ type InnerTenant struct {
AnalyticsOptOut bool `json:"analyticsOptOut"`
ControllerPartitionID *string `json:"controllerPartitionId,omitempty"`
WorkerPartitionID *string `json:"workerPartitionId,omitempty"`
DataRetentionPeriod string `json:"dataRetentionPeriod"`
AlertMemberEmails bool `json:"alertMemberEmails"`
}
@@ -6843,6 +6850,7 @@ type RawTenantModel struct {
AnalyticsOptOut RawBoolean `json:"analyticsOptOut"`
ControllerPartitionID *RawString `json:"controllerPartitionId,omitempty"`
WorkerPartitionID *RawString `json:"workerPartitionId,omitempty"`
DataRetentionPeriod RawString `json:"dataRetentionPeriod"`
AlertMemberEmails RawBoolean `json:"alertMemberEmails"`
}
@@ -27492,6 +27500,11 @@ type tenantQuery struct {
// @optional
WorkerPartitionID tenantQueryWorkerPartitionIDString
// DataRetentionPeriod
//
// @required
DataRetentionPeriod tenantQueryDataRetentionPeriodString
Events tenantQueryEventsRelations
Workflows tenantQueryWorkflowsRelations
@@ -30656,6 +30669,353 @@ func (r tenantQueryWorkerPartitionIDString) Field() tenantPrismaFields {
return tenantFieldWorkerPartitionID
}
// base struct
type tenantQueryDataRetentionPeriodString struct{}
// Set the required value of DataRetentionPeriod
func (r tenantQueryDataRetentionPeriodString) Set(value string) tenantSetParam {
return tenantSetParam{
data: builder.Field{
Name: "dataRetentionPeriod",
Value: value,
},
}
}
// Set the optional value of DataRetentionPeriod dynamically
func (r tenantQueryDataRetentionPeriodString) SetIfPresent(value *String) tenantSetParam {
if value == nil {
return tenantSetParam{}
}
return r.Set(*value)
}
func (r tenantQueryDataRetentionPeriodString) Equals(value string) tenantWithPrismaDataRetentionPeriodEqualsParam {
return tenantWithPrismaDataRetentionPeriodEqualsParam{
data: builder.Field{
Name: "dataRetentionPeriod",
Fields: []builder.Field{
{
Name: "equals",
Value: value,
},
},
},
}
}
func (r tenantQueryDataRetentionPeriodString) EqualsIfPresent(value *string) tenantWithPrismaDataRetentionPeriodEqualsParam {
if value == nil {
return tenantWithPrismaDataRetentionPeriodEqualsParam{}
}
return r.Equals(*value)
}
func (r tenantQueryDataRetentionPeriodString) Order(direction SortOrder) tenantDefaultParam {
return tenantDefaultParam{
data: builder.Field{
Name: "dataRetentionPeriod",
Value: direction,
},
}
}
func (r tenantQueryDataRetentionPeriodString) Cursor(cursor string) tenantCursorParam {
return tenantCursorParam{
data: builder.Field{
Name: "dataRetentionPeriod",
Value: cursor,
},
}
}
func (r tenantQueryDataRetentionPeriodString) In(value []string) tenantDefaultParam {
return tenantDefaultParam{
data: builder.Field{
Name: "dataRetentionPeriod",
Fields: []builder.Field{
{
Name: "in",
Value: value,
},
},
},
}
}
func (r tenantQueryDataRetentionPeriodString) InIfPresent(value []string) tenantDefaultParam {
if value == nil {
return tenantDefaultParam{}
}
return r.In(value)
}
func (r tenantQueryDataRetentionPeriodString) NotIn(value []string) tenantDefaultParam {
return tenantDefaultParam{
data: builder.Field{
Name: "dataRetentionPeriod",
Fields: []builder.Field{
{
Name: "notIn",
Value: value,
},
},
},
}
}
func (r tenantQueryDataRetentionPeriodString) NotInIfPresent(value []string) tenantDefaultParam {
if value == nil {
return tenantDefaultParam{}
}
return r.NotIn(value)
}
func (r tenantQueryDataRetentionPeriodString) Lt(value string) tenantDefaultParam {
return tenantDefaultParam{
data: builder.Field{
Name: "dataRetentionPeriod",
Fields: []builder.Field{
{
Name: "lt",
Value: value,
},
},
},
}
}
func (r tenantQueryDataRetentionPeriodString) LtIfPresent(value *string) tenantDefaultParam {
if value == nil {
return tenantDefaultParam{}
}
return r.Lt(*value)
}
func (r tenantQueryDataRetentionPeriodString) Lte(value string) tenantDefaultParam {
return tenantDefaultParam{
data: builder.Field{
Name: "dataRetentionPeriod",
Fields: []builder.Field{
{
Name: "lte",
Value: value,
},
},
},
}
}
func (r tenantQueryDataRetentionPeriodString) LteIfPresent(value *string) tenantDefaultParam {
if value == nil {
return tenantDefaultParam{}
}
return r.Lte(*value)
}
func (r tenantQueryDataRetentionPeriodString) Gt(value string) tenantDefaultParam {
return tenantDefaultParam{
data: builder.Field{
Name: "dataRetentionPeriod",
Fields: []builder.Field{
{
Name: "gt",
Value: value,
},
},
},
}
}
func (r tenantQueryDataRetentionPeriodString) GtIfPresent(value *string) tenantDefaultParam {
if value == nil {
return tenantDefaultParam{}
}
return r.Gt(*value)
}
func (r tenantQueryDataRetentionPeriodString) Gte(value string) tenantDefaultParam {
return tenantDefaultParam{
data: builder.Field{
Name: "dataRetentionPeriod",
Fields: []builder.Field{
{
Name: "gte",
Value: value,
},
},
},
}
}
func (r tenantQueryDataRetentionPeriodString) GteIfPresent(value *string) tenantDefaultParam {
if value == nil {
return tenantDefaultParam{}
}
return r.Gte(*value)
}
func (r tenantQueryDataRetentionPeriodString) Contains(value string) tenantDefaultParam {
return tenantDefaultParam{
data: builder.Field{
Name: "dataRetentionPeriod",
Fields: []builder.Field{
{
Name: "contains",
Value: value,
},
},
},
}
}
func (r tenantQueryDataRetentionPeriodString) ContainsIfPresent(value *string) tenantDefaultParam {
if value == nil {
return tenantDefaultParam{}
}
return r.Contains(*value)
}
func (r tenantQueryDataRetentionPeriodString) StartsWith(value string) tenantDefaultParam {
return tenantDefaultParam{
data: builder.Field{
Name: "dataRetentionPeriod",
Fields: []builder.Field{
{
Name: "startsWith",
Value: value,
},
},
},
}
}
func (r tenantQueryDataRetentionPeriodString) StartsWithIfPresent(value *string) tenantDefaultParam {
if value == nil {
return tenantDefaultParam{}
}
return r.StartsWith(*value)
}
func (r tenantQueryDataRetentionPeriodString) EndsWith(value string) tenantDefaultParam {
return tenantDefaultParam{
data: builder.Field{
Name: "dataRetentionPeriod",
Fields: []builder.Field{
{
Name: "endsWith",
Value: value,
},
},
},
}
}
func (r tenantQueryDataRetentionPeriodString) EndsWithIfPresent(value *string) tenantDefaultParam {
if value == nil {
return tenantDefaultParam{}
}
return r.EndsWith(*value)
}
func (r tenantQueryDataRetentionPeriodString) Mode(value QueryMode) tenantDefaultParam {
return tenantDefaultParam{
data: builder.Field{
Name: "dataRetentionPeriod",
Fields: []builder.Field{
{
Name: "mode",
Value: value,
},
},
},
}
}
func (r tenantQueryDataRetentionPeriodString) ModeIfPresent(value *QueryMode) tenantDefaultParam {
if value == nil {
return tenantDefaultParam{}
}
return r.Mode(*value)
}
func (r tenantQueryDataRetentionPeriodString) Not(value string) tenantDefaultParam {
return tenantDefaultParam{
data: builder.Field{
Name: "dataRetentionPeriod",
Fields: []builder.Field{
{
Name: "not",
Value: value,
},
},
},
}
}
func (r tenantQueryDataRetentionPeriodString) NotIfPresent(value *string) tenantDefaultParam {
if value == nil {
return tenantDefaultParam{}
}
return r.Not(*value)
}
// deprecated: Use StartsWith instead.
func (r tenantQueryDataRetentionPeriodString) HasPrefix(value string) tenantDefaultParam {
return tenantDefaultParam{
data: builder.Field{
Name: "dataRetentionPeriod",
Fields: []builder.Field{
{
Name: "starts_with",
Value: value,
},
},
},
}
}
// deprecated: Use StartsWithIfPresent instead.
func (r tenantQueryDataRetentionPeriodString) HasPrefixIfPresent(value *string) tenantDefaultParam {
if value == nil {
return tenantDefaultParam{}
}
return r.HasPrefix(*value)
}
// deprecated: Use EndsWith instead.
func (r tenantQueryDataRetentionPeriodString) HasSuffix(value string) tenantDefaultParam {
return tenantDefaultParam{
data: builder.Field{
Name: "dataRetentionPeriod",
Fields: []builder.Field{
{
Name: "ends_with",
Value: value,
},
},
},
}
}
// deprecated: Use EndsWithIfPresent instead.
func (r tenantQueryDataRetentionPeriodString) HasSuffixIfPresent(value *string) tenantDefaultParam {
if value == nil {
return tenantDefaultParam{}
}
return r.HasSuffix(*value)
}
func (r tenantQueryDataRetentionPeriodString) Field() tenantPrismaFields {
return tenantFieldDataRetentionPeriod
}
// base struct
type tenantQueryEventsEvent struct{}
@@ -178450,6 +178810,7 @@ var tenantOutput = []builder.Output{
{Name: "analyticsOptOut"},
{Name: "controllerPartitionId"},
{Name: "workerPartitionId"},
{Name: "dataRetentionPeriod"},
{Name: "alertMemberEmails"},
}
@@ -179475,6 +179836,84 @@ func (p tenantWithPrismaWorkerPartitionIDEqualsUniqueParam) workerPartitionIDFie
func (tenantWithPrismaWorkerPartitionIDEqualsUniqueParam) unique() {}
func (tenantWithPrismaWorkerPartitionIDEqualsUniqueParam) equals() {}
type TenantWithPrismaDataRetentionPeriodEqualsSetParam interface {
field() builder.Field
getQuery() builder.Query
equals()
tenantModel()
dataRetentionPeriodField()
}
type TenantWithPrismaDataRetentionPeriodSetParam interface {
field() builder.Field
getQuery() builder.Query
tenantModel()
dataRetentionPeriodField()
}
type tenantWithPrismaDataRetentionPeriodSetParam struct {
data builder.Field
query builder.Query
}
func (p tenantWithPrismaDataRetentionPeriodSetParam) field() builder.Field {
return p.data
}
func (p tenantWithPrismaDataRetentionPeriodSetParam) getQuery() builder.Query {
return p.query
}
func (p tenantWithPrismaDataRetentionPeriodSetParam) tenantModel() {}
func (p tenantWithPrismaDataRetentionPeriodSetParam) dataRetentionPeriodField() {}
type TenantWithPrismaDataRetentionPeriodWhereParam interface {
field() builder.Field
getQuery() builder.Query
tenantModel()
dataRetentionPeriodField()
}
type tenantWithPrismaDataRetentionPeriodEqualsParam struct {
data builder.Field
query builder.Query
}
func (p tenantWithPrismaDataRetentionPeriodEqualsParam) field() builder.Field {
return p.data
}
func (p tenantWithPrismaDataRetentionPeriodEqualsParam) getQuery() builder.Query {
return p.query
}
func (p tenantWithPrismaDataRetentionPeriodEqualsParam) tenantModel() {}
func (p tenantWithPrismaDataRetentionPeriodEqualsParam) dataRetentionPeriodField() {}
func (tenantWithPrismaDataRetentionPeriodSetParam) settable() {}
func (tenantWithPrismaDataRetentionPeriodEqualsParam) equals() {}
type tenantWithPrismaDataRetentionPeriodEqualsUniqueParam struct {
data builder.Field
query builder.Query
}
func (p tenantWithPrismaDataRetentionPeriodEqualsUniqueParam) field() builder.Field {
return p.data
}
func (p tenantWithPrismaDataRetentionPeriodEqualsUniqueParam) getQuery() builder.Query {
return p.query
}
func (p tenantWithPrismaDataRetentionPeriodEqualsUniqueParam) tenantModel() {}
func (p tenantWithPrismaDataRetentionPeriodEqualsUniqueParam) dataRetentionPeriodField() {}
func (tenantWithPrismaDataRetentionPeriodEqualsUniqueParam) unique() {}
func (tenantWithPrismaDataRetentionPeriodEqualsUniqueParam) equals() {}
type TenantWithPrismaEventsEqualsSetParam interface {
field() builder.Field
getQuery() builder.Query

View File

@@ -133,3 +133,25 @@ FROM
WHERE
"tenantId" = @tenantId::uuid AND
"id" = ANY (sqlc.arg('ids')::uuid[]);
-- name: DeleteExpiredEvents :one
WITH expired_events_count AS (
SELECT COUNT(*) as count
FROM "Event" e1
WHERE
e1."tenantId" = @tenantId::uuid AND
e1."createdAt" < @createdBefore::timestamp
), expired_events_with_limit AS (
SELECT
"id"
FROM "Event" e2
WHERE
e2."tenantId" = @tenantId::uuid AND
e2."createdAt" < @createdBefore::timestamp
ORDER BY "createdAt" ASC
LIMIT sqlc.arg('limit')
)
DELETE FROM "Event"
WHERE
"id" IN (SELECT "id" FROM expired_events_with_limit)
RETURNING (SELECT count FROM expired_events_count) as total, (SELECT count FROM expired_events_count) - (SELECT COUNT(*) FROM expired_events_with_limit) as remaining, (SELECT COUNT(*) FROM expired_events_with_limit) as deleted;

View File

@@ -128,6 +128,48 @@ func (q *Queries) CreateEvent(ctx context.Context, db DBTX, arg CreateEventParam
return &i, err
}
const deleteExpiredEvents = `-- name: DeleteExpiredEvents :one
WITH expired_events_count AS (
SELECT COUNT(*) as count
FROM "Event" e1
WHERE
e1."tenantId" = $1::uuid AND
e1."createdAt" < $2::timestamp
), expired_events_with_limit AS (
SELECT
"id"
FROM "Event" e2
WHERE
e2."tenantId" = $1::uuid AND
e2."createdAt" < $2::timestamp
ORDER BY "createdAt" ASC
LIMIT $3
)
DELETE FROM "Event"
WHERE
"id" IN (SELECT "id" FROM expired_events_with_limit)
RETURNING (SELECT count FROM expired_events_count) as total, (SELECT count FROM expired_events_count) - (SELECT COUNT(*) FROM expired_events_with_limit) as remaining, (SELECT COUNT(*) FROM expired_events_with_limit) as deleted
`
type DeleteExpiredEventsParams struct {
Tenantid pgtype.UUID `json:"tenantid"`
Createdbefore pgtype.Timestamp `json:"createdbefore"`
Limit int32 `json:"limit"`
}
type DeleteExpiredEventsRow struct {
Total int64 `json:"total"`
Remaining int32 `json:"remaining"`
Deleted int64 `json:"deleted"`
}
func (q *Queries) DeleteExpiredEvents(ctx context.Context, db DBTX, arg DeleteExpiredEventsParams) (*DeleteExpiredEventsRow, error) {
row := db.QueryRow(ctx, deleteExpiredEvents, arg.Tenantid, arg.Createdbefore, arg.Limit)
var i DeleteExpiredEventsRow
err := row.Scan(&i.Total, &i.Remaining, &i.Deleted)
return &i, err
}
const getEventForEngine = `-- name: GetEventForEngine :one
SELECT
id, "createdAt", "updatedAt", "deletedAt", key, "tenantId", "replayedFromId", data, "additionalMetadata"

View File

@@ -876,6 +876,7 @@ type Tenant struct {
AlertMemberEmails bool `json:"alertMemberEmails"`
ControllerPartitionId pgtype.Text `json:"controllerPartitionId"`
WorkerPartitionId pgtype.Text `json:"workerPartitionId"`
DataRetentionPeriod string `json:"dataRetentionPeriod"`
}
type TenantAlertEmailGroup struct {

View File

@@ -362,6 +362,7 @@ CREATE TABLE "Tenant" (
"alertMemberEmails" BOOLEAN NOT NULL DEFAULT true,
"controllerPartitionId" TEXT,
"workerPartitionId" TEXT,
"dataRetentionPeriod" TEXT NOT NULL DEFAULT '720h',
CONSTRAINT "Tenant_pkey" PRIMARY KEY ("id")
);

View File

@@ -7,7 +7,7 @@ WITH active_controller_partitions AS (
WHERE
"lastHeartbeat" > NOW() - INTERVAL '1 minute'
)
INSERT INTO "Tenant" ("id", "name", "slug", "controllerPartitionId")
INSERT INTO "Tenant" ("id", "name", "slug", "controllerPartitionId", "dataRetentionPeriod")
VALUES (
sqlc.arg('id')::uuid,
sqlc.arg('name')::text,
@@ -20,7 +20,8 @@ VALUES (
ORDER BY
random()
LIMIT 1
)
),
COALESCE(sqlc.narg('dataRetentionPeriod')::text, '720h')
)
RETURNING *;

View File

@@ -39,7 +39,7 @@ WITH active_controller_partitions AS (
WHERE
"lastHeartbeat" > NOW() - INTERVAL '1 minute'
)
INSERT INTO "Tenant" ("id", "name", "slug", "controllerPartitionId")
INSERT INTO "Tenant" ("id", "name", "slug", "controllerPartitionId", "dataRetentionPeriod")
VALUES (
$1::uuid,
$2::text,
@@ -52,19 +52,26 @@ VALUES (
ORDER BY
random()
LIMIT 1
)
),
COALESCE($4::text, '720h')
)
RETURNING id, "createdAt", "updatedAt", "deletedAt", name, slug, "analyticsOptOut", "alertMemberEmails", "controllerPartitionId", "workerPartitionId"
RETURNING id, "createdAt", "updatedAt", "deletedAt", name, slug, "analyticsOptOut", "alertMemberEmails", "controllerPartitionId", "workerPartitionId", "dataRetentionPeriod"
`
type CreateTenantParams struct {
ID pgtype.UUID `json:"id"`
Name string `json:"name"`
Slug string `json:"slug"`
ID pgtype.UUID `json:"id"`
Name string `json:"name"`
Slug string `json:"slug"`
DataRetentionPeriod pgtype.Text `json:"dataRetentionPeriod"`
}
func (q *Queries) CreateTenant(ctx context.Context, db DBTX, arg CreateTenantParams) (*Tenant, error) {
row := db.QueryRow(ctx, createTenant, arg.ID, arg.Name, arg.Slug)
row := db.QueryRow(ctx, createTenant,
arg.ID,
arg.Name,
arg.Slug,
arg.DataRetentionPeriod,
)
var i Tenant
err := row.Scan(
&i.ID,
@@ -77,6 +84,7 @@ func (q *Queries) CreateTenant(ctx context.Context, db DBTX, arg CreateTenantPar
&i.AlertMemberEmails,
&i.ControllerPartitionId,
&i.WorkerPartitionId,
&i.DataRetentionPeriod,
)
return &i, err
}
@@ -295,7 +303,7 @@ func (q *Queries) GetTenantAlertingSettings(ctx context.Context, db DBTX, tenant
const getTenantByID = `-- name: GetTenantByID :one
SELECT
id, "createdAt", "updatedAt", "deletedAt", name, slug, "analyticsOptOut", "alertMemberEmails", "controllerPartitionId", "workerPartitionId"
id, "createdAt", "updatedAt", "deletedAt", name, slug, "analyticsOptOut", "alertMemberEmails", "controllerPartitionId", "workerPartitionId", "dataRetentionPeriod"
FROM
"Tenant" as tenants
WHERE
@@ -316,6 +324,7 @@ func (q *Queries) GetTenantByID(ctx context.Context, db DBTX, id pgtype.UUID) (*
&i.AlertMemberEmails,
&i.ControllerPartitionId,
&i.WorkerPartitionId,
&i.DataRetentionPeriod,
)
return &i, err
}
@@ -458,7 +467,7 @@ func (q *Queries) GetTenantWorkflowQueueMetrics(ctx context.Context, db DBTX, ar
const listTenants = `-- name: ListTenants :many
SELECT
id, "createdAt", "updatedAt", "deletedAt", name, slug, "analyticsOptOut", "alertMemberEmails", "controllerPartitionId", "workerPartitionId"
id, "createdAt", "updatedAt", "deletedAt", name, slug, "analyticsOptOut", "alertMemberEmails", "controllerPartitionId", "workerPartitionId", "dataRetentionPeriod"
FROM
"Tenant" as tenants
`
@@ -483,6 +492,7 @@ func (q *Queries) ListTenants(ctx context.Context, db DBTX) ([]*Tenant, error) {
&i.AlertMemberEmails,
&i.ControllerPartitionId,
&i.WorkerPartitionId,
&i.DataRetentionPeriod,
); err != nil {
return nil, err
}
@@ -504,7 +514,7 @@ WITH update_partition AS (
"id" = $1::text
)
SELECT
id, "createdAt", "updatedAt", "deletedAt", name, slug, "analyticsOptOut", "alertMemberEmails", "controllerPartitionId", "workerPartitionId"
id, "createdAt", "updatedAt", "deletedAt", name, slug, "analyticsOptOut", "alertMemberEmails", "controllerPartitionId", "workerPartitionId", "dataRetentionPeriod"
FROM
"Tenant" as tenants
WHERE
@@ -531,6 +541,7 @@ func (q *Queries) ListTenantsByControllerPartitionId(ctx context.Context, db DBT
&i.AlertMemberEmails,
&i.ControllerPartitionId,
&i.WorkerPartitionId,
&i.DataRetentionPeriod,
); err != nil {
return nil, err
}
@@ -552,7 +563,7 @@ WITH update_partition AS (
"id" = $1::text
)
SELECT
id, "createdAt", "updatedAt", "deletedAt", name, slug, "analyticsOptOut", "alertMemberEmails", "controllerPartitionId", "workerPartitionId"
id, "createdAt", "updatedAt", "deletedAt", name, slug, "analyticsOptOut", "alertMemberEmails", "controllerPartitionId", "workerPartitionId", "dataRetentionPeriod"
FROM
"Tenant" as tenants
WHERE
@@ -579,6 +590,7 @@ func (q *Queries) ListTenantsByTenantWorkerPartitionId(ctx context.Context, db D
&i.AlertMemberEmails,
&i.ControllerPartitionId,
&i.WorkerPartitionId,
&i.DataRetentionPeriod,
); err != nil {
return nil, err
}
@@ -617,7 +629,7 @@ WHERE
"controllerPartitionId" IS NULL OR
"controllerPartitionId" NOT IN (SELECT "id" FROM active_partitions)
)
RETURNING id, "createdAt", "updatedAt", "deletedAt", name, slug, "analyticsOptOut", "alertMemberEmails", "controllerPartitionId", "workerPartitionId"
RETURNING id, "createdAt", "updatedAt", "deletedAt", name, slug, "analyticsOptOut", "alertMemberEmails", "controllerPartitionId", "workerPartitionId", "dataRetentionPeriod"
`
func (q *Queries) RebalanceAllControllerPartitions(ctx context.Context, db DBTX) error {
@@ -652,7 +664,7 @@ WHERE
"workerPartitionId" IS NULL OR
"workerPartitionId" NOT IN (SELECT "id" FROM active_partitions)
)
RETURNING id, "createdAt", "updatedAt", "deletedAt", name, slug, "analyticsOptOut", "alertMemberEmails", "controllerPartitionId", "workerPartitionId"
RETURNING id, "createdAt", "updatedAt", "deletedAt", name, slug, "analyticsOptOut", "alertMemberEmails", "controllerPartitionId", "workerPartitionId", "dataRetentionPeriod"
`
func (q *Queries) RebalanceAllTenantWorkerPartitions(ctx context.Context, db DBTX) error {

View File

@@ -599,3 +599,27 @@ WHERE
(sqlc.narg('childKey')::text IS NULL AND "childIndex" = @childIndex) OR
(sqlc.narg('childKey')::text IS NOT NULL AND "childKey" = sqlc.narg('childKey')::text)
);
-- name: DeleteExpiredWorkflowRuns :one
WITH expired_runs_count AS (
SELECT COUNT(*) as count
FROM "WorkflowRun" wr1
WHERE
wr1."tenantId" = @tenantId::uuid AND
wr1."status" = ANY(cast(sqlc.narg('statuses')::text[] as "WorkflowRunStatus"[])) AND
wr1."createdAt" < @createdBefore::timestamp
), expired_runs_with_limit AS (
SELECT
"id"
FROM "WorkflowRun" wr2
WHERE
wr2."tenantId" = @tenantId::uuid AND
wr2."status" = ANY(cast(sqlc.narg('statuses')::text[] as "WorkflowRunStatus"[])) AND
wr2."createdAt" < @createdBefore::timestamp
ORDER BY "createdAt" ASC
LIMIT sqlc.arg('limit')
)
DELETE FROM "WorkflowRun"
WHERE
"id" IN (SELECT "id" FROM expired_runs_with_limit)
RETURNING (SELECT count FROM expired_runs_count) as total, (SELECT count FROM expired_runs_count) - (SELECT COUNT(*) FROM expired_runs_with_limit) as remaining, (SELECT COUNT(*) FROM expired_runs_with_limit) as deleted;

View File

@@ -490,6 +490,56 @@ func (q *Queries) CreateWorkflowRunTriggeredBy(ctx context.Context, db DBTX, arg
return &i, err
}
const deleteExpiredWorkflowRuns = `-- name: DeleteExpiredWorkflowRuns :one
WITH expired_runs_count AS (
SELECT COUNT(*) as count
FROM "WorkflowRun" wr1
WHERE
wr1."tenantId" = $1::uuid AND
wr1."status" = ANY(cast($2::text[] as "WorkflowRunStatus"[])) AND
wr1."createdAt" < $3::timestamp
), expired_runs_with_limit AS (
SELECT
"id"
FROM "WorkflowRun" wr2
WHERE
wr2."tenantId" = $1::uuid AND
wr2."status" = ANY(cast($2::text[] as "WorkflowRunStatus"[])) AND
wr2."createdAt" < $3::timestamp
ORDER BY "createdAt" ASC
LIMIT $4
)
DELETE FROM "WorkflowRun"
WHERE
"id" IN (SELECT "id" FROM expired_runs_with_limit)
RETURNING (SELECT count FROM expired_runs_count) as total, (SELECT count FROM expired_runs_count) - (SELECT COUNT(*) FROM expired_runs_with_limit) as remaining, (SELECT COUNT(*) FROM expired_runs_with_limit) as deleted
`
type DeleteExpiredWorkflowRunsParams struct {
Tenantid pgtype.UUID `json:"tenantid"`
Statuses []string `json:"statuses"`
Createdbefore pgtype.Timestamp `json:"createdbefore"`
Limit int32 `json:"limit"`
}
type DeleteExpiredWorkflowRunsRow struct {
Total int64 `json:"total"`
Remaining int32 `json:"remaining"`
Deleted int64 `json:"deleted"`
}
func (q *Queries) DeleteExpiredWorkflowRuns(ctx context.Context, db DBTX, arg DeleteExpiredWorkflowRunsParams) (*DeleteExpiredWorkflowRunsRow, error) {
row := db.QueryRow(ctx, deleteExpiredWorkflowRuns,
arg.Tenantid,
arg.Statuses,
arg.Createdbefore,
arg.Limit,
)
var i DeleteExpiredWorkflowRunsRow
err := row.Scan(&i.Total, &i.Remaining, &i.Deleted)
return &i, err
}
const getChildWorkflowRun = `-- name: GetChildWorkflowRun :one
SELECT
"createdAt", "updatedAt", "deletedAt", "tenantId", "workflowVersionId", status, error, "startedAt", "finishedAt", "concurrencyGroupId", "displayName", id, "childIndex", "childKey", "parentId", "parentStepRunId", "additionalMetadata"

View File

@@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"time"
"github.com/google/uuid"
"github.com/jackc/pgx/v5"
@@ -285,3 +286,21 @@ func (r *eventEngineRepository) ListEventsByIds(ctx context.Context, tenantId st
Ids: pgIds,
})
}
func (r *eventEngineRepository) DeleteExpiredEvents(ctx context.Context, tenantId string, before time.Time) (int, int, error) {
resp, err := r.queries.DeleteExpiredEvents(ctx, r.pool, dbsqlc.DeleteExpiredEventsParams{
Tenantid: sqlchelpers.UUIDFromStr(tenantId),
Createdbefore: sqlchelpers.TimestampFromTime(before),
Limit: 1000,
})
if err != nil {
if errors.Is(err, pgx.ErrNoRows) {
return 0, 0, nil
}
return 0, 0, err
}
return int(resp.Deleted), int(resp.Remaining), nil
}

View File

@@ -51,6 +51,12 @@ func (r *tenantAPIRepository) CreateTenant(opts *repository.CreateTenantOpts) (*
tenantId = *opts.ID
}
var dataRetentionPeriod pgtype.Text
if opts.DataRetentionPeriod != nil {
dataRetentionPeriod = sqlchelpers.TextFromStr(*opts.DataRetentionPeriod)
}
tx, err := r.pool.Begin(context.Background())
if err != nil {
@@ -60,9 +66,10 @@ func (r *tenantAPIRepository) CreateTenant(opts *repository.CreateTenantOpts) (*
defer deferRollback(context.Background(), r.l, tx.Rollback)
createTenant, err := r.queries.CreateTenant(context.Background(), tx, dbsqlc.CreateTenantParams{
ID: sqlchelpers.UUIDFromStr(tenantId),
Slug: opts.Slug,
Name: opts.Name,
ID: sqlchelpers.UUIDFromStr(tenantId),
Slug: opts.Slug,
Name: opts.Name,
DataRetentionPeriod: dataRetentionPeriod,
})
if err != nil {

View File

@@ -242,6 +242,31 @@ func (w *workflowRunEngineRepository) CreateNewWorkflowRun(ctx context.Context,
return id, nil
}
func (w *workflowRunEngineRepository) DeleteExpiredWorkflowRuns(ctx context.Context, tenantId string, statuses []dbsqlc.WorkflowRunStatus, before time.Time) (int, int, error) {
paramStatuses := make([]string, 0)
for _, status := range statuses {
paramStatuses = append(paramStatuses, string(status))
}
resp, err := w.queries.DeleteExpiredWorkflowRuns(ctx, w.pool, dbsqlc.DeleteExpiredWorkflowRunsParams{
Tenantid: sqlchelpers.UUIDFromStr(tenantId),
Statuses: paramStatuses,
Createdbefore: sqlchelpers.TimestampFromTime(before),
Limit: 1000,
})
if err != nil {
if errors.Is(err, pgx.ErrNoRows) {
return 0, 0, nil
}
return 0, 0, err
}
return int(resp.Deleted), int(resp.Remaining), nil
}
func listWorkflowRuns(ctx context.Context, pool *pgxpool.Pool, queries *dbsqlc.Queries, l *zerolog.Logger, tenantId string, opts *repository.ListWorkflowRunsOpts) (*repository.ListWorkflowRunsResult, error) {
res := &repository.ListWorkflowRunsResult{}

View File

@@ -16,6 +16,9 @@ type CreateTenantOpts struct {
// (optional) the tenant ID
ID *string `validate:"omitempty,uuid"`
// (optional) the tenant data retention period
DataRetentionPeriod *string `validate:"omitempty,duration"`
}
type UpdateTenantOpts struct {

View File

@@ -376,4 +376,8 @@ type WorkflowRunEngineRepository interface {
// GetWorkflowRunById returns a workflow run by id.
GetWorkflowRunById(ctx context.Context, tenantId, runId string) (*dbsqlc.GetWorkflowRunRow, error)
// DeleteExpiredWorkflowRuns deletes workflow runs that were created before the given time. It returns the number of deleted runs
// and the number of non-deleted runs that match the conditions.
DeleteExpiredWorkflowRuns(ctx context.Context, tenantId string, statuses []dbsqlc.WorkflowRunStatus, before time.Time) (int, int, error)
}

View File

@@ -18,7 +18,7 @@ const (
HatchetNameErr = "Hatchet names must match the regex ^[a-zA-Z0-9\\.\\-_]+$"
ActionIDErr = "Invalid action ID. Action IDs must be in the format <integrationId>:<verb>"
CronErr = "Invalid cron expression"
DurationErr = "Invalid duration. Durations must be in the format <number><unit>, where unit is one of: 's', 'm', 'h', 'd', 'w', 'M', 'y'"
DurationErr = "Invalid duration. Durations must be in the format <number><unit>, where unit is one of: 's', 'm', 'h'"
)
type APIErrors gen.APIErrors

View File

@@ -0,0 +1,2 @@
-- AlterTable
ALTER TABLE "Tenant" ADD COLUMN "dataRetentionPeriod" TEXT NOT NULL DEFAULT '720h';

View File

@@ -171,6 +171,9 @@ model Tenant {
workerPartition TenantWorkerPartition? @relation(fields: [workerPartitionId], references: [id], onDelete: SetNull, onUpdate: SetNull)
workerPartitionId String?
// The data retention period for deletable resources. This is a Go duration string.
dataRetentionPeriod String @default("720h")
events Event[]
workflows Workflow[]
jobs Job[]

View File

@@ -1,24 +1,26 @@
-- atlas:txmode none
-- Create index "Event_createdAt_idx" to table: "Event"
CREATE INDEX "Event_createdAt_idx" ON "Event" ("createdAt");
CREATE INDEX CONCURRENTLY IF NOT EXISTS "Event_createdAt_idx" ON "Event" ("createdAt");
-- Create index "Event_tenantId_createdAt_idx" to table: "Event"
CREATE INDEX "Event_tenantId_createdAt_idx" ON "Event" ("tenantId", "createdAt");
CREATE INDEX CONCURRENTLY IF NOT EXISTS "Event_tenantId_createdAt_idx" ON "Event" ("tenantId", "createdAt");
-- Create index "Event_tenantId_idx" to table: "Event"
CREATE INDEX "Event_tenantId_idx" ON "Event" ("tenantId");
CREATE INDEX CONCURRENTLY IF NOT EXISTS "Event_tenantId_idx" ON "Event" ("tenantId");
-- Create index "WorkflowRun_createdAt_idx" to table: "WorkflowRun"
CREATE INDEX "WorkflowRun_createdAt_idx" ON "WorkflowRun" ("createdAt");
CREATE INDEX CONCURRENTLY IF NOT EXISTS "WorkflowRun_createdAt_idx" ON "WorkflowRun" ("createdAt");
-- Create index "WorkflowRun_finishedAt_idx" to table: "WorkflowRun"
CREATE INDEX "WorkflowRun_finishedAt_idx" ON "WorkflowRun" ("finishedAt");
CREATE INDEX CONCURRENTLY IF NOT EXISTS "WorkflowRun_finishedAt_idx" ON "WorkflowRun" ("finishedAt");
-- Create index "WorkflowRun_status_idx" to table: "WorkflowRun"
CREATE INDEX "WorkflowRun_status_idx" ON "WorkflowRun" ("status");
CREATE INDEX CONCURRENTLY IF NOT EXISTS "WorkflowRun_status_idx" ON "WorkflowRun" ("status");
-- Create index "WorkflowRun_tenantId_createdAt_idx" to table: "WorkflowRun"
CREATE INDEX "WorkflowRun_tenantId_createdAt_idx" ON "WorkflowRun" ("tenantId", "createdAt");
CREATE INDEX CONCURRENTLY IF NOT EXISTS "WorkflowRun_tenantId_createdAt_idx" ON "WorkflowRun" ("tenantId", "createdAt");
-- Create index "WorkflowRun_tenantId_idx" to table: "WorkflowRun"
CREATE INDEX "WorkflowRun_tenantId_idx" ON "WorkflowRun" ("tenantId");
CREATE INDEX CONCURRENTLY IF NOT EXISTS "WorkflowRun_tenantId_idx" ON "WorkflowRun" ("tenantId");
-- Create index "WorkflowRun_workflowVersionId_idx" to table: "WorkflowRun"
CREATE INDEX "WorkflowRun_workflowVersionId_idx" ON "WorkflowRun" ("workflowVersionId");
CREATE INDEX CONCURRENTLY IF NOT EXISTS "WorkflowRun_workflowVersionId_idx" ON "WorkflowRun" ("workflowVersionId");
-- Create index "WorkflowRunTriggeredBy_eventId_idx" to table: "WorkflowRunTriggeredBy"
CREATE INDEX "WorkflowRunTriggeredBy_eventId_idx" ON "WorkflowRunTriggeredBy" ("eventId");
CREATE INDEX CONCURRENTLY IF NOT EXISTS "WorkflowRunTriggeredBy_eventId_idx" ON "WorkflowRunTriggeredBy" ("eventId");
-- Create index "WorkflowRunTriggeredBy_parentId_idx" to table: "WorkflowRunTriggeredBy"
CREATE INDEX "WorkflowRunTriggeredBy_parentId_idx" ON "WorkflowRunTriggeredBy" ("parentId");
CREATE INDEX CONCURRENTLY IF NOT EXISTS "WorkflowRunTriggeredBy_parentId_idx" ON "WorkflowRunTriggeredBy" ("parentId");
-- Create index "WorkflowRunTriggeredBy_tenantId_idx" to table: "WorkflowRunTriggeredBy"
CREATE INDEX "WorkflowRunTriggeredBy_tenantId_idx" ON "WorkflowRunTriggeredBy" ("tenantId");
CREATE INDEX CONCURRENTLY IF NOT EXISTS "WorkflowRunTriggeredBy_tenantId_idx" ON "WorkflowRunTriggeredBy" ("tenantId");

View File

@@ -0,0 +1,2 @@
-- Modify "Tenant" table
ALTER TABLE "Tenant" ADD COLUMN "dataRetentionPeriod" text NOT NULL DEFAULT '720h';

View File

@@ -1,4 +1,4 @@
h1:hAw5/ZClxRS7GL8N4lsT5pu8L8S+Akh6+Jlm0fVML5k=
h1:0tkfIj+EH3zq3hwM1t4wJ5wHmGjmD9NOMvWB2fElF/g=
20240115180414_init.sql h1:Ef3ZyjAHkmJPdGF/dEWCahbwgcg6uGJKnDxW2JCRi2k=
20240122014727_v0_6_0.sql h1:o/LdlteAeFgoHJ3e/M4Xnghqt9826IE/Y/h0q95Acuo=
20240126235456_v0_7_0.sql h1:KiVzt/hXgQ6esbdC6OMJOOWuYEXmy1yeCpmsVAHTFKs=
@@ -35,4 +35,5 @@ h1:hAw5/ZClxRS7GL8N4lsT5pu8L8S+Akh6+Jlm0fVML5k=
20240625180548_v0.34.0.sql h1:77uSk0VF/jBvEPHCqWC4hmMQqUx4zVnMdTryGsIXt9s=
20240626204339_v0.34.2.sql h1:e2hArnEfcEYcBjEPxZW3axkl4CGt2lHa1oIA2r2fjfY=
20240701144852_v0_35_0.sql h1:q8pPeq4LZp7hxZZp4P08xctwAdQFKDEA9vbj1Ulbn7U=
20240703194656_v0.35.1.sql h1:2plFt+n4AhwJqEICU20O9nHnSVDBubZxg8Rnr3eZD+Y=
20240703194656_v0.35.1.sql h1:wg/DWVOmWy7UiXrimlnwongTcT0aJa4pYSOYkiREgNg=
20240704211315_v0.35.2.sql h1:/AzVYp+jzwPGx8JHUCPjBi2CnXmFvtsTWL3SgrC49IE=

View File

@@ -362,6 +362,7 @@ CREATE TABLE "Tenant" (
"alertMemberEmails" BOOLEAN NOT NULL DEFAULT true,
"controllerPartitionId" TEXT,
"workerPartitionId" TEXT,
"dataRetentionPeriod" TEXT NOT NULL DEFAULT '720h',
CONSTRAINT "Tenant_pkey" PRIMARY KEY ("id")
);