Files
hatchet/prisma/schema.prisma
T
Gabe Ruttner 44addbb47e Feat scheduled improvements (#992)
* wip: stub schedule page

* wip: stub list

* fix: 2025 bug...

* feat: wip cron list

* feat: addl meta

* feat: expose metadata column

* feat: sort and created at

* cron to recurring

* scheduled: with statuses

* fix: links

* feat: expose schedule ids

* feat: delete run

* fix: remove search

* feat: filterable scheduled

* fix: remove broken features

* chore: lint

* rm metadata for now

* chore: lint

* chore: recurring to cron job

* fix: review comments

* fix: populator
2024-11-01 07:16:20 -04:00

1860 lines
45 KiB
Plaintext

datasource db {
provider = "postgresql"
url = env("DATABASE_URL")
}
generator go {
provider = "go run github.com/steebchen/prisma-client-go"
output = "../pkg/repository/prisma/db"
disableGitignore = true
}
model User {
// base fields
id String @id @unique @default(uuid()) @db.Uuid
createdAt DateTime @default(now())
updatedAt DateTime @default(now()) @updatedAt
deletedAt DateTime?
// the user's email address
email String @unique
// whether the user's email address has been verified
emailVerified Boolean @default(false)
// the user's oauth providers
oauthProviders UserOAuth[]
// The hashed user's password. This is placed in a separate table so that it isn't returned by default.
password UserPassword?
// the user's name
name String?
// the user sessions
sessions UserSession[]
memberships TenantMember[]
}
model UserOAuth {
// base fields
id String @id @unique @default(uuid()) @db.Uuid
createdAt DateTime @default(now())
updatedAt DateTime @default(now()) @updatedAt
// the linked user
user User @relation(fields: [userId], references: [id], onDelete: Cascade, onUpdate: Cascade)
userId String @unique @db.Uuid
// the oauth provider
provider String
// the oauth provider's user id
providerUserId String
// the oauth provider's access token
accessToken Bytes @db.ByteA
// the oauth provider's refresh token
refreshToken Bytes? @db.ByteA
// the oauth provider's expiry time
expiresAt DateTime?
// oauth should be unique per user id + provider
@@unique([userId, provider])
}
model UserPassword {
hash String
user User @relation(fields: [userId], references: [id], onDelete: Cascade, onUpdate: Cascade)
userId String @unique @db.Uuid
}
model UserSession {
// base fields
id String @id @unique @default(uuid()) @db.Uuid
createdAt DateTime @default(now())
updatedAt DateTime @default(now()) @updatedAt
// the linked user. The user can be empty if the session is created but not authenticated.
user User? @relation(fields: [userId], references: [id], onDelete: Cascade, onUpdate: Cascade)
userId String? @db.Uuid
// arbitrary session data
data Json?
// the expiry time of the session
expiresAt DateTime
}
// WebhookWorker can be used to handle workflows via webhooks
model WebhookWorker {
id String @id @unique @default(uuid()) @db.Uuid
createdAt DateTime @default(now())
updatedAt DateTime @default(now()) @updatedAt
name String
secret String
url String @unique
// stores the encrypted hatchet auth token
tokenValue String?
deleted Boolean @default(false)
tokenId String? @db.Uuid
token APIToken? @relation(fields: [tokenId], references: [id], onDelete: Cascade, onUpdate: Cascade)
tenantId String @db.Uuid
tenant Tenant @relation(fields: [tenantId], references: [id], onDelete: Cascade, onUpdate: Cascade)
webhookWorkerWorkflows WebhookWorkerWorkflow[]
worker Worker?
requests WebhookWorkerRequest[]
}
enum WebhookWorkerRequestMethod {
GET
POST
PUT
}
model WebhookWorkerRequest {
id String @id @unique @default(uuid()) @db.Uuid
createdAt DateTime @default(now())
// the parent webhook worker
webhookWorker WebhookWorker @relation(fields: [webhookWorkerId], references: [id], onDelete: Cascade, onUpdate: Cascade)
webhookWorkerId String @db.Uuid
// the request method
method WebhookWorkerRequestMethod
// the request status code
statusCode Int
}
model WebhookWorkerWorkflow {
id String @id @unique @default(uuid()) @db.Uuid
webhookWorker WebhookWorker @relation(fields: [webhookWorkerId], references: [id], onDelete: Cascade, onUpdate: Cascade)
webhookWorkerId String @db.Uuid
workflow Workflow @relation(fields: [workflowId], references: [id], onDelete: Cascade, onUpdate: Cascade)
workflowId String @db.Uuid
@@unique([webhookWorkerId, workflowId])
}
// ControllerPartition represents an engine instance that only handles a subset of tenants. This is used for list
// operations across tenants. If tenants do not have a partition, they are included in all partitions.
model ControllerPartition {
id String @id @unique
createdAt DateTime @default(now())
updatedAt DateTime @default(now()) @updatedAt
// lastHeartbeat is used for rebalancing partitions
lastHeartbeat DateTime?
// name of the partition
name String?
tenants Tenant[]
}
// SchedulerPartition represents an engine instance the handles scheduling.
model SchedulerPartition {
id String @id @unique
createdAt DateTime @default(now())
updatedAt DateTime @default(now()) @updatedAt
// lastHeartbeat is used for rebalancing partitions
lastHeartbeat DateTime?
// name of the partition
name String?
tenants Tenant[]
}
model TenantWorkerPartition {
id String @id @unique
createdAt DateTime @default(now())
updatedAt DateTime @default(now()) @updatedAt
// lastHeartbeat is used for rebalancing partitions
lastHeartbeat DateTime?
// name of the partition
name String?
tenants Tenant[]
}
// Tenant represents a unique tenant in the database. Each tenant-scoped resource should have the tenant as
// an identifier, which makes tenant isolation easier.
model Tenant {
// base fields
id String @id @unique @default(uuid()) @db.Uuid
createdAt DateTime @default(now())
updatedAt DateTime @default(now()) @updatedAt
deletedAt DateTime?
name String
slug String @unique
// whether the user has opted out of analytics
analyticsOptOut Boolean @default(false)
// the parent controller partition, if exists
controllerPartition ControllerPartition? @relation(fields: [controllerPartitionId], references: [id], onDelete: SetNull, onUpdate: SetNull)
controllerPartitionId String?
// the parent scheduler partition, if exists
schedulerPartition SchedulerPartition? @relation(fields: [schedulerPartitionId], references: [id], onDelete: SetNull, onUpdate: SetNull)
schedulerPartitionId String?
// the parent worker partition, if exists
workerPartition TenantWorkerPartition? @relation(fields: [workerPartitionId], references: [id], onDelete: SetNull, onUpdate: SetNull)
workerPartitionId String?
// The data retention period for deletable resources. This is a Go duration string.
dataRetentionPeriod String @default("720h")
triggers WorkflowTriggers[]
members TenantMember[]
workflowTags WorkflowTag[]
actions Action[]
services Service[]
invites TenantInviteLink[]
apiTokens APIToken[]
vcsProviders TenantVcsProvider[]
snsIntegrations SNSIntegration[]
alertEmailGroups TenantAlertEmailGroup[]
// alertMemberEmails controls whether to send alert emails to tenant members in addition to the alert email groups
alertMemberEmails Boolean @default(true)
slackWebhooks SlackAppWebhook[]
alertingSettings TenantAlertingSettings?
limits TenantResourceLimit[]
limitAlerts TenantResourceLimitAlert[]
webhookWorkers WebhookWorker[]
@@index([controllerPartitionId])
@@index([workerPartitionId])
}
enum LimitResource {
WORKFLOW_RUN
EVENT
WORKER
CRON
SCHEDULE
}
model TenantResourceLimit {
id String @id @unique @default(uuid()) @db.Uuid
createdAt DateTime @default(now())
updatedAt DateTime @default(now()) @updatedAt
resource LimitResource
tenant Tenant @relation(fields: [tenantId], references: [id], onDelete: Cascade, onUpdate: Cascade)
tenantId String @db.Uuid
// The max number of requests allowed in the window
limitValue Int
// The max number before an alert is triggered
alarmValue Int?
// the current rate limit bucket value
value Int @default(0)
// the meter window
window String?
// the time the rate limit was last refilled
lastRefill DateTime @default(now())
customValueMeter Boolean @default(false)
alerts TenantResourceLimitAlert[]
@@unique([tenantId, resource])
}
enum TenantResourceLimitAlertType {
Alarm
Exhausted
}
model TenantResourceLimitAlert {
id String @id @unique @default(uuid()) @db.Uuid
createdAt DateTime @default(now())
updatedAt DateTime @default(now()) @updatedAt
resourceLimit TenantResourceLimit @relation(fields: [resourceLimitId], references: [id], onDelete: Cascade, onUpdate: Cascade)
resourceLimitId String @db.Uuid
tenant Tenant @relation(fields: [tenantId], references: [id], onDelete: Cascade, onUpdate: Cascade)
tenantId String @db.Uuid
resource LimitResource
alertType TenantResourceLimitAlertType
// the current value of the resource limit at the time of the alert
value Int
// the limit at the time of the alert
limit Int
}
model TenantAlertingSettings {
// base fields
id String @id @unique @default(uuid()) @db.Uuid
createdAt DateTime @default(now())
updatedAt DateTime @default(now()) @updatedAt
deletedAt DateTime?
// the parent tenant
tenant Tenant @relation(fields: [tenantId], references: [id], onDelete: Cascade, onUpdate: Cascade)
tenantId String @unique @db.Uuid
// workflow run failure alerts
enableWorkflowRunFailureAlerts Boolean @default(false)
enableExpiringTokenAlerts Boolean @default(true)
enableTenantResourceLimitAlerts Boolean @default(true)
// the maximum alerting frequency
maxFrequency String @default("1h")
lastAlertedAt DateTime?
// the assigned ticker
ticker Ticker? @relation(fields: [tickerId], references: [id])
tickerId String? @db.Uuid
}
enum TenantMemberRole {
OWNER
ADMIN
MEMBER
}
model TenantMember {
// base fields
id String @id @unique @default(uuid()) @db.Uuid
createdAt DateTime @default(now())
updatedAt DateTime @default(now()) @updatedAt
// the parent tenant
tenant Tenant @relation(fields: [tenantId], references: [id], onDelete: Cascade, onUpdate: Cascade)
tenantId String @db.Uuid
// the linked user
user User @relation(fields: [userId], references: [id], onDelete: Cascade, onUpdate: Cascade)
userId String @db.Uuid
// the member's role
role TenantMemberRole
// members are unique per tenant
@@unique([tenantId, userId])
}
enum InviteLinkStatus {
PENDING
ACCEPTED
REJECTED
}
model TenantInviteLink {
// base fields
id String @id @unique @default(uuid()) @db.Uuid
createdAt DateTime @default(now())
updatedAt DateTime @default(now()) @updatedAt
tenant Tenant @relation(fields: [tenantId], references: [id], onDelete: Cascade, onUpdate: Cascade)
tenantId String @db.Uuid
inviterEmail String
inviteeEmail String
expires DateTime
status InviteLinkStatus @default(PENDING)
role TenantMemberRole @default(OWNER)
}
model APIToken {
// base fields
id String @id @unique @default(uuid()) @db.Uuid
createdAt DateTime @default(now())
updatedAt DateTime @default(now()) @updatedAt
// when it expires
expiresAt DateTime?
// whether the token has been revoked
revoked Boolean @default(false)
// when to next alert about expiration
nextAlertAt DateTime?
// whether this token is for internal (internal to Hatchet) use
internal Boolean @default(false)
// an optional name for the token
name String?
tenant Tenant? @relation(fields: [tenantId], references: [id], onDelete: Cascade, onUpdate: Cascade)
tenantId String? @db.Uuid
webhookWorkers WebhookWorker[]
}
model EventKey {
id BigInt @id @default(autoincrement()) @db.BigInt
key String
tenantId String @db.Uuid
@@unique([key, tenantId])
}
// Event represents an event in the database.
model Event {
// base fields
id String @id @unique @default(uuid()) @db.Uuid
createdAt DateTime @default(now())
updatedAt DateTime @default(now()) @updatedAt
deletedAt DateTime?
// the event key
key String
// the parent tenant
tenantId String @db.Uuid
// the event which was replayed
replayedFrom Event? @relation("EventReplay", fields: [replayedFromId], references: [id])
replayedFromId String? @db.Uuid
// the events which were replayed
replays Event[] @relation("EventReplay")
// data stored in the event
data Json?
// metadata stored in the event
additionalMetadata Json?
// this is used to order the events on bulk inserts
insertOrder Int?
@@index([tenantId])
@@index([createdAt])
@@index([tenantId, createdAt])
}
model WorkflowTag {
// base fields
id String @id @unique @default(uuid()) @db.Uuid
createdAt DateTime @default(now())
updatedAt DateTime @default(now()) @updatedAt
// the parent workflow
tenant Tenant @relation(fields: [tenantId], references: [id], onDelete: Cascade, onUpdate: Cascade)
tenantId String @db.Uuid
// the tag name
name String
// the tag color
color String @default("#93C5FD") // a nice indigo
// the workflows this tag is linked to
workflows Workflow[]
// tags are unique per tenant
@@unique([tenantId, name])
}
model Workflow {
// base fields
id String @id @unique @default(uuid()) @db.Uuid
createdAt DateTime @default(now())
updatedAt DateTime @default(now()) @updatedAt
deletedAt DateTime?
// the parent tenant
tenantId String @db.Uuid
isPaused Boolean? @default(false)
// the workflow name
name String
// the workflow description
description String?
// tracked versions of the workflow
versions WorkflowVersion[]
// the tags for this workflow
tags WorkflowTag[]
webhookWorkerWorkflows WebhookWorkerWorkflow[]
// workflow names are unique per tenant
@@unique([tenantId, name])
@@index([deletedAt])
}
enum StickyStrategy {
SOFT
HARD
}
enum WorkflowKind {
FUNCTION
DURABLE
DAG
}
model WorkflowVersion {
// base fields
id String @id @unique @default(uuid()) @db.Uuid
createdAt DateTime @default(now())
updatedAt DateTime @default(now()) @updatedAt
deletedAt DateTime?
// note that checksums don't need to be unique, as they're computed from the workflow
// declaration, which can be the same for multiple versions (e.g. on revert)
checksum String
version String?
order BigInt @default(autoincrement()) @db.BigInt
// the parent workflow
workflow Workflow @relation(fields: [workflowId], references: [id], onDelete: Cascade, onUpdate: Cascade)
workflowId String @db.Uuid
// the declared triggers for the job
triggers WorkflowTriggers?
// concurrency limits for the workflow
concurrency WorkflowConcurrency?
// sticky strategy for the workflow to assign steps to the same worker
sticky StickyStrategy?
// the declared jobs
jobs Job[]
// a job that runs when the workflow fails
onFailureJob Job? @relation("OnFailureJob", fields: [onFailureJobId], references: [id])
onFailureJobId String? @unique @db.Uuid
// the scheduled runs for the workflow
scheduled WorkflowTriggerScheduledRef[]
// the kind of workflow
kind WorkflowKind @default(DAG)
// the default amount of time to wait while scheduling a step run
scheduleTimeout String @default("5m")
// default priority for the workflow
defaultPriority Int?
@@index([deletedAt])
}
enum ConcurrencyLimitStrategy {
// Cancel the existing runs and start a new one
CANCEL_IN_PROGRESS
// Don't create a new run if concurrency limit has been reached
DROP_NEWEST
// Queue new runs and start them when falling below the concurrency limit
QUEUE_NEWEST
// Performs round-robin queueing based on the concurrency group. For example, if there
// are 10 workflows queued in concurrency groups A, B and C, and the concurrency limit is 3,
// then 1 workflows from A, B and C will be started.
GROUP_ROUND_ROBIN
}
model WorkflowConcurrency {
// base fields
id String @id @unique @default(uuid()) @db.Uuid
createdAt DateTime @default(now())
updatedAt DateTime @default(now()) @updatedAt
// the parent workflow
workflow WorkflowVersion @relation(fields: [workflowVersionId], references: [id], onDelete: Cascade, onUpdate: Cascade)
workflowVersionId String @unique @db.Uuid
// An action which gets the concurrency group for the WorkflowRun.
getConcurrencyGroup Action? @relation(fields: [getConcurrencyGroupId], references: [id])
getConcurrencyGroupId String? @db.Uuid
// A CEL expression for getting the concurrency group based on the input to the workflow.
concurrencyGroupExpression String?
// the maximum number of concurrent workflow runs
maxRuns Int @default(1)
// the strategy to use when the concurrency limit is reached
limitStrategy ConcurrencyLimitStrategy @default(CANCEL_IN_PROGRESS)
}
model WorkflowTriggers {
// base fields
id String @id @unique @default(uuid()) @db.Uuid
createdAt DateTime @default(now())
updatedAt DateTime @default(now()) @updatedAt
deletedAt DateTime?
// the parent workflow
workflow WorkflowVersion @relation(fields: [workflowVersionId], references: [id], onDelete: Cascade, onUpdate: Cascade)
workflowVersionId String @unique @db.Uuid
// the parent tenant
tenant Tenant @relation(fields: [tenantId], references: [id], onDelete: Cascade, onUpdate: Cascade)
tenantId String @db.Uuid
// events that trigger this workflow
events WorkflowTriggerEventRef[]
crons WorkflowTriggerCronRef[]
}
model WorkflowTriggerEventRef {
// the parent workflow
parent WorkflowTriggers @relation(fields: [parentId], references: [id], onDelete: Cascade, onUpdate: Cascade)
parentId String @db.Uuid
// the event key
eventKey String
// event references must be unique per workflow
@@unique([parentId, eventKey])
}
model WorkflowTriggerCronRef {
// the parent workflow
parent WorkflowTriggers @relation(fields: [parentId], references: [id], onDelete: Cascade, onUpdate: Cascade)
parentId String @db.Uuid
createdAt DateTime @default(now())
updatedAt DateTime @default(now()) @updatedAt
deletedAt DateTime?
// the cron expression
cron String
// whether this cron is enabled or not
enabled Boolean @default(true)
// the assigned ticker
ticker Ticker? @relation(fields: [tickerId], references: [id])
tickerId String? @db.Uuid
triggered WorkflowRunTriggeredBy[]
// the input parameters to the scheduled workflow
input Json?
additionalMetadata Json?
// cron references must be unique per workflow
@@unique([parentId, cron])
}
model WorkflowTriggerScheduledRef {
id String @id @unique @default(uuid()) @db.Uuid
createdAt DateTime @default(now())
updatedAt DateTime @default(now()) @updatedAt
deletedAt DateTime?
// the parent workflow
parent WorkflowVersion @relation(fields: [parentId], references: [id], onDelete: Cascade, onUpdate: Cascade)
parentId String @db.Uuid
// the time that the workflow should be triggered
triggerAt DateTime
// the assigned ticker
ticker Ticker? @relation(fields: [tickerId], references: [id])
tickerId String? @db.Uuid
// the input parameters to the scheduled workflow
input Json?
// that parent that spawned this workflow run
parentWorkflowRun WorkflowRun? @relation(fields: [parentWorkflowRunId], references: [id])
parentWorkflowRunId String? @db.Uuid
parentStepRun StepRun? @relation(fields: [parentStepRunId], references: [id])
parentStepRunId String? @db.Uuid
// if this is a child workflow run, the index of the child
childIndex Int?
// a user-defined key for this workflow run
childKey String?
triggered WorkflowRunTriggeredBy?
additionalMetadata Json?
@@unique([parentId, parentStepRunId, childKey])
}
enum JobKind {
// DEFAULT job kinds get started immediately when the workflow execution starts
DEFAULT
// ON_FAILURE job kinds get started when the workflow fails
ON_FAILURE
}
model Job {
// base fields
id String @id @unique @default(uuid()) @db.Uuid
createdAt DateTime @default(now())
updatedAt DateTime @default(now()) @updatedAt
deletedAt DateTime?
// the parent tenant
tenantId String @db.Uuid
// the parent workflow
workflow WorkflowVersion @relation(fields: [workflowVersionId], references: [id], onDelete: Cascade, onUpdate: Cascade)
workflowVersionId String @db.Uuid
// the job name
name String
// the job description
description String?
// the declared steps
steps Step[]
// a timeout value for the job
timeout String?
// the kind of job
kind JobKind @default(DEFAULT)
// a link to workflow
failureRelations WorkflowVersion? @relation("OnFailureJob")
// jobs names are unique per workflow
@@unique([workflowVersionId, name])
}
model Action {
// base fields
id String @id @unique @default(uuid()) @db.Uuid
actionId String
// the action description
description String?
tenant Tenant @relation(fields: [tenantId], references: [id], onDelete: Cascade, onUpdate: Cascade)
tenantId String @db.Uuid
// the action's steps
steps Step[]
// the action's workers
workers Worker[]
// the action's concurrency rules
concurrency WorkflowConcurrency[]
// actions are unique per tenant
@@unique([tenantId, actionId])
}
model StepDesiredWorkerLabel {
id BigInt @id @default(autoincrement()) @db.BigInt
createdAt DateTime @default(now())
updatedAt DateTime @default(now()) @updatedAt
step Step @relation(fields: [stepId], references: [id], onDelete: Cascade, onUpdate: Cascade)
stepId String @db.Uuid
key String
strValue String?
intValue Int?
required Boolean
comparator WorkerLabelComparator
weight Int
@@unique([stepId, key])
@@index([stepId])
}
model Step {
// base fields
id String @id @unique @default(uuid()) @db.Uuid
createdAt DateTime @default(now())
updatedAt DateTime @default(now()) @updatedAt
deletedAt DateTime?
// a readable id for the step
readableId String?
// the parent tenant
tenantId String @db.Uuid
// the parent job
job Job @relation(fields: [jobId], references: [id], onDelete: Cascade, onUpdate: Cascade)
jobId String @db.Uuid
// an action id for the step
action Action @relation(fields: [actionId, tenantId], references: [actionId, tenantId])
actionId String
timeout String?
retries Int @default(0)
// customUserData is a JSON object that can be used to store arbitrary data for the step
customUserData Json?
// a list of dependents for this step
children Step[] @relation("StepOrder")
// a list of dependencies for this step
parents Step[] @relation("StepOrder")
// the default amount of time to wait while scheduling a step run
scheduleTimeout String @default("5m")
workerLabels StepDesiredWorkerLabel[]
// readable ids are unique per job
@@unique([jobId, readableId])
}
enum StepRateLimitKind {
// STATIC rate limits are assigned to every step run belonging to these steps
STATIC
// DYNAMIC rate limits are assigned to specific step runs
DYNAMIC
}
model StepRateLimit {
units Int
stepId String @db.Uuid
rateLimit RateLimit @relation(fields: [tenantId, rateLimitKey], references: [tenantId, key])
rateLimitKey String
kind StepRateLimitKind @default(STATIC)
tenantId String @db.Uuid
@@unique([stepId, rateLimitKey])
}
model RateLimit {
// the parent tenant
tenantId String @db.Uuid
// the rate limit key
key String
// The max number of requests allowed in the window
limitValue Int
// the current rate limit bucket value
value Int
// the rate limit window
window String
// the time the rate limit was last refilled
lastRefill DateTime @default(now())
stepRunLimits StepRateLimit[]
// rate limits are unique per tenant
@@unique([tenantId, key])
}
enum WorkflowRunStatus {
PENDING
QUEUED
RUNNING
SUCCEEDED
FAILED
}
model WorkflowRunStickyState {
id BigInt @id @default(autoincrement()) @db.BigInt
createdAt DateTime @default(now())
updatedAt DateTime @default(now()) @updatedAt
tenantId String @db.Uuid
// the parent workflow run
workflowRun WorkflowRun @relation(fields: [workflowRunId], references: [id], onDelete: Cascade, onUpdate: Cascade)
workflowRunId String @unique @db.Uuid
// the sticky state
desiredWorkerId String? @db.Uuid
// the sticky state value
strategy StickyStrategy
}
model WorkflowRun {
// base fields
id String @id @unique @default(uuid()) @db.Uuid
createdAt DateTime @default(now())
updatedAt DateTime @default(now()) @updatedAt
deletedAt DateTime?
displayName String?
// the parent tenant
tenantId String @db.Uuid
// the parent workflow
workflowVersionId String @db.Uuid
concurrencyGroupId String?
getGroupKeyRun GetGroupKeyRun?
status WorkflowRunStatus @default(PENDING)
jobRuns JobRun[]
sticky WorkflowRunStickyState?
// the run error
error String?
// the run started at
startedAt DateTime?
// the run finished at
finishedAt DateTime?
// the duration of the run (ms)
duration BigInt? @db.BigInt
// priority of the workflow run
priority Int?
// a list of dependents for this workflow run
children WorkflowRun[] @relation("WorkflowRunChild")
scheduledChildren WorkflowTriggerScheduledRef[]
// that parent that spawned this workflow run
parent WorkflowRun? @relation("WorkflowRunChild", fields: [parentId], references: [id])
parentId String? @db.Uuid
parentStepRun StepRun? @relation(fields: [parentStepRunId], references: [id])
parentStepRunId String? @db.Uuid
// if this is a child workflow run, the index of the child
childIndex Int?
// a user-defined key for this workflow run
childKey String?
// (optional) additional metadata for the workflow run
additionalMetadata Json?
// this is used to order the workflow runs on bulk inserts
insertOrder Int?
@@unique([parentId, parentStepRunId, childKey])
@@index([tenantId])
@@index([workflowVersionId])
@@index([createdAt])
@@index([tenantId, createdAt])
@@index([finishedAt])
@@index([status])
@@index([deletedAt])
}
model WorkflowRunDedupe {
id BigInt @unique @default(autoincrement()) @db.BigInt
createdAt DateTime @default(now())
updatedAt DateTime @default(now()) @updatedAt
// the parent tenant
tenantId String @db.Uuid
// the parent workflow
workflowId String @db.Uuid
// the workflow run id which used this dedupe value
workflowRunId String @db.Uuid
// the dedupe value
value String
// DO NOT REMOVE - this uniqueness constraint is cased on in code
@@unique([tenantId, workflowId, value])
@@index([tenantId, value])
}
model GetGroupKeyRun {
// base fields
id String @id @unique @default(uuid()) @db.Uuid
createdAt DateTime @default(now())
updatedAt DateTime @default(now()) @updatedAt
deletedAt DateTime? // TODO verify we're setting this
// the parent tenant
tenantId String @db.Uuid
// the parent workflow run
workflowRun WorkflowRun @relation(fields: [workflowRunId], references: [id], onDelete: Cascade, onUpdate: Cascade)
workflowRunId String @unique @db.Uuid
// the worker assigned to this group key run
worker Worker? @relation(fields: [workerId], references: [id])
workerId String? @db.Uuid
// the assigned ticker
ticker Ticker? @relation(fields: [tickerId], references: [id])
tickerId String? @db.Uuid
// the run status
status StepRunStatus @default(PENDING)
// the group key run input
input Json?
// the group key as output
output String?
// when the step should be requeued
requeueAfter DateTime?
// when the step run times out due to a scheduling timeout (no workers available)
scheduleTimeoutAt DateTime?
// the run error
error String?
// the run started at
startedAt DateTime?
// the run finished at
finishedAt DateTime?
// the run timeout at
timeoutAt DateTime?
// the run cancelled at
cancelledAt DateTime?
// the reason for why the run was cancelled
cancelledReason String?
// errors while cancelling the run
cancelledError String?
@@index([deletedAt])
@@index([tenantId])
@@index([workerId])
@@index([createdAt])
// index for ListStepRunsToReassign, ListStepRunsToRequeue
@@index([tenantId, deletedAt, status])
// index for PollGetGroupKeyRuns
@@index([status, deletedAt, timeoutAt])
}
model WorkflowRunTriggeredBy {
id String @id @unique @default(uuid()) @db.Uuid
createdAt DateTime @default(now())
updatedAt DateTime @default(now()) @updatedAt
deletedAt DateTime?
// the tenant
tenantId String @db.Uuid
// the parent workflow run
parentId String @unique @db.Uuid
// the input if this was triggered manually
input Json?
// the parent event
eventId String? @db.Uuid
// the cron reference that triggered this workflow
cron WorkflowTriggerCronRef? @relation(fields: [cronParentId, cronSchedule], references: [parentId, cron])
cronParentId String? @db.Uuid
cronSchedule String?
// a specific time that triggered this workflow
scheduled WorkflowTriggerScheduledRef? @relation(fields: [scheduledId], references: [id])
scheduledId String? @unique @db.Uuid
@@index([tenantId])
@@index([eventId])
@@index([parentId])
}
enum JobRunStatus {
PENDING
RUNNING
SUCCEEDED
FAILED
CANCELLED
}
model JobRun {
// base fields
id String @id @unique @default(uuid()) @db.Uuid
createdAt DateTime @default(now())
updatedAt DateTime @default(now()) @updatedAt
deletedAt DateTime?
// the parent tenant
tenantId String @db.Uuid
// the parent workflow run
workflowRun WorkflowRun @relation(fields: [workflowRunId], references: [id], onDelete: Cascade, onUpdate: Cascade)
workflowRunId String @db.Uuid
// the parent job
jobId String @db.Uuid
// the assigned ticker
tickerId String? @db.Uuid
stepRuns StepRun[]
// the run status
status JobRunStatus @default(PENDING)
lookupData JobRunLookupData?
// the run result
result Json?
// the run started at
startedAt DateTime?
// the run finished at
finishedAt DateTime?
// the run timeout at
timeoutAt DateTime?
// the run cancelled at
cancelledAt DateTime?
// the reason for why the run was cancelled
cancelledReason String?
// errors while cancelling the run
cancelledError String?
// index for ResolveWorkflowRunStatus and ListJobRunsForWorkflowRun
@@index([workflowRunId, tenantId])
@@index([deletedAt])
}
model JobRunLookupData {
id String @id @unique @default(uuid()) @db.Uuid
createdAt DateTime @default(now())
updatedAt DateTime @default(now()) @updatedAt
deletedAt DateTime?
// the parent job run
jobRun JobRun @relation(fields: [jobRunId], references: [id], onDelete: Cascade, onUpdate: Cascade)
jobRunId String @unique @db.Uuid
// the tenant id
tenantId String @db.Uuid
data Json?
// additional field so we can look up by both job run id and tenant id
@@unique([jobRunId, tenantId])
}
enum StepRunStatus {
// pending states
PENDING
PENDING_ASSIGNMENT // A run is in a pending assignment state if it is waiting for a worker to be assigned to it
ASSIGNED
// running states
RUNNING
CANCELLING
// final states
SUCCEEDED
FAILED
CANCELLED
}
enum StepExpressionKind {
DYNAMIC_RATE_LIMIT_KEY
DYNAMIC_RATE_LIMIT_VALUE
DYNAMIC_RATE_LIMIT_UNITS
DYNAMIC_RATE_LIMIT_WINDOW
}
model StepExpression {
key String
stepId String @db.Uuid
expression String
kind StepExpressionKind
@@id([key, stepId, kind])
}
// StepRunExpressionEval is an evaluated expression for a step run
model StepRunExpressionEval {
key String
stepRunId String @db.Uuid
valueStr String?
valueInt Int?
kind StepExpressionKind
@@id([key, stepRunId, kind])
@@index([stepRunId])
}
model StepRun {
// base fields
id String @id @unique @default(uuid()) @db.Uuid
createdAt DateTime @default(now())
updatedAt DateTime @default(now()) @updatedAt
deletedAt DateTime?
// the parent tenant
tenantId String @db.Uuid
// the parent job run
jobRun JobRun @relation(fields: [jobRunId], references: [id], onDelete: Cascade, onUpdate: Cascade)
jobRunId String @db.Uuid
// the parent step
stepId String @db.Uuid
// a list of dependents for this step
children StepRun[] @relation("StepRunOrder")
// a list of dependencies for this step
parents StepRun[] @relation("StepRunOrder")
order BigInt @default(autoincrement()) @db.BigInt
queue String @default("default")
priority Int?
// the worker assigned to this job
worker Worker? @relation(fields: [workerId], references: [id])
workerId String? @db.Uuid
// the assigned ticker
tickerId String? @db.Uuid
// the run status
status StepRunStatus @default(PENDING)
// the run input
input Json?
// the run output
output Json?
// inputSchema is a JSON object which declares a JSON schema for the input data
inputSchema Json?
// when the step should be requeued
requeueAfter DateTime?
// when the step run times out due to a scheduling timeout (no workers available)
scheduleTimeoutAt DateTime?
// which retry we're on for this step run
retryCount Int @default(0)
// the run error
error String?
// the run started at
startedAt DateTime?
// the run finished at
finishedAt DateTime?
// the run timeout at
timeoutAt DateTime?
// the run cancelled at
cancelledAt DateTime?
// the reason for why the run was cancelled
cancelledReason String?
// errors while cancelling the run
cancelledError String?
// a map of override values to caller files for the step run
callerFiles Json?
// the github branch that this is running on
gitRepoBranch String?
// if the semaphore was released prior to terminal state
semaphoreReleased Boolean @default(false)
archivedResults StepRunResultArchive[]
streamEvents StreamEvent[]
logs LogLine[]
childWorkflowRuns WorkflowRun[]
childSchedules WorkflowTriggerScheduledRef[]
@@index([tenantId])
@@index([workerId])
@@index([createdAt])
// index for LinkStepRunParents
@@index([stepId])
// index for ListStartableStepRuns
@@index([jobRunId, status])
// index for ResolveLaterStepRuns
@@index([id, tenantId])
// index for ResolveJobRunStatus, ResolveLaterStepRuns, and LinkStepRunParents
@@index([jobRunId, tenantId, order])
// NOTE: custom index on "StepRun_jobRunId_status_tenantId_idx" set manually in migrations
// deletedAt index
@@index([deletedAt])
}
model Queue {
id BigInt @id @default(autoincrement()) @db.BigInt
tenantId String @db.Uuid
name String
lastActive DateTime?
@@unique([tenantId, name])
@@index([tenantId, lastActive])
}
model QueueItem {
id BigInt @id @default(autoincrement()) @db.BigInt
// optional params which are specific to step runs
stepRunId String? @db.Uuid
stepId String? @db.Uuid
actionId String?
scheduleTimeoutAt DateTime?
stepTimeout String?
// ALTER TABLE "QueueItem" ADD CONSTRAINT "QueueItem_priority_check" CHECK ("priority" >= 1 AND "priority" <= 4);
priority Int @default(1) // custom migration to set this between 1 and 4
isQueued Boolean
tenantId String @db.Uuid
queue String
// sticky strategy for the queue item to assign steps to the same worker
sticky StickyStrategy?
desiredWorkerId String? @db.Uuid
@@index([isQueued, tenantId, queue, priority(sort: Desc), id], name: "QueueItem_isQueued_priority_tenantId_queue_id_idx_2")
}
enum InternalQueue {
WORKER_SEMAPHORE_COUNT
STEP_RUN_UPDATE
STEP_RUN_UPDATE_V2
WORKFLOW_RUN_UPDATE
WORKFLOW_RUN_PAUSED
}
model InternalQueueItem {
id BigInt @id @default(autoincrement()) @db.BigInt
queue InternalQueue
isQueued Boolean
data Json?
tenantId String @db.Uuid
// ALTER TABLE "InternalQueueItem" ADD CONSTRAINT "InternalQueueItem_priority_check" CHECK ("priority" >= 1 AND "priority" <= 4);
priority Int @default(1) // custom migration to set this between 1 and 4
uniqueKey String?
@@unique([tenantId, queue, uniqueKey])
@@index([isQueued, tenantId, queue, priority(sort: Desc), id])
}
model TimeoutQueueItem {
id BigInt @id @default(autoincrement()) @db.BigInt
// the parent step run
stepRunId String @db.Uuid
retryCount Int
// the time the step run times out due to a scheduling timeout (no workers available)
timeoutAt DateTime
// the parent tenant
tenantId String @db.Uuid
isQueued Boolean
@@unique([stepRunId, retryCount])
@@index([tenantId, isQueued, timeoutAt])
}
model SemaphoreQueueItem {
stepRunId String @id @unique @db.Uuid
workerId String @db.Uuid
// the parent tenant
tenantId String @db.Uuid
@@index([tenantId, workerId])
}
enum StepRunEventReason {
REQUEUED_NO_WORKER
REQUEUED_RATE_LIMIT
RATE_LIMIT_ERROR
SCHEDULING_TIMED_OUT
TIMED_OUT
REASSIGNED
ASSIGNED
SENT_TO_WORKER
STARTED
ACKNOWLEDGED
FINISHED
FAILED
RETRYING
RETRIED_BY_USER
CANCELLED
TIMEOUT_REFRESHED
SLOT_RELEASED
// NOTE: not actually step run events, but the table is currently named StepRunEvent
WORKFLOW_RUN_GROUP_KEY_SUCCEEDED
WORKFLOW_RUN_GROUP_KEY_FAILED
}
enum StepRunEventSeverity {
INFO
WARNING
CRITICAL
}
model StepRunEvent {
id BigInt @unique @default(autoincrement()) @db.BigInt
timeFirstSeen DateTime @default(now())
timeLastSeen DateTime @default(now())
// the parent step run
stepRunId String? @db.Uuid
// the parent workflow run
workflowRunId String? @db.Uuid
// the event reason
reason StepRunEventReason
// the event severity
severity StepRunEventSeverity
message String
count Int
data Json?
@@index([stepRunId])
@@index([workflowRunId])
}
model StepRunResultArchive {
id String @id @unique @default(uuid()) @db.Uuid
createdAt DateTime @default(now())
updatedAt DateTime @default(now()) @updatedAt
deletedAt DateTime?
// the parent step run
stepRun StepRun @relation(fields: [stepRunId], references: [id], onDelete: Cascade, onUpdate: Cascade)
stepRunId String @db.Uuid
retryCount Int @default(0)
order BigInt @default(autoincrement()) @db.BigInt
// the run input
input Json?
// the run output
output Json?
// the run error
error String?
// the run started at
startedAt DateTime?
// the run finished at
finishedAt DateTime?
// the run timeout at
timeoutAt DateTime?
// the run cancelled at
cancelledAt DateTime?
// the reason for why the run was cancelled
cancelledReason String?
// errors while cancelling the run
cancelledError String?
}
model Dispatcher {
// base fields
id String @id @unique @default(uuid()) @db.Uuid
createdAt DateTime @default(now())
updatedAt DateTime @default(now()) @updatedAt
deletedAt DateTime?
// the last heartbeat time
lastHeartbeatAt DateTime?
// whether this dispatcher is active or not
isActive Boolean @default(true)
// a list of workers connected to this dispatcher
workers Worker[]
}
model Ticker {
// base fields
id String @id @unique @default(uuid()) @db.Uuid
createdAt DateTime @default(now())
updatedAt DateTime @default(now()) @updatedAt
// the last heartbeat time
lastHeartbeatAt DateTime?
// whether this ticker is active or not
isActive Boolean @default(true)
crons WorkflowTriggerCronRef[]
scheduled WorkflowTriggerScheduledRef[]
groupKeyRuns GetGroupKeyRun[]
tenantAlerts TenantAlertingSettings[]
}
enum WorkerLabelComparator {
EQUAL
NOT_EQUAL
GREATER_THAN
GREATER_THAN_OR_EQUAL
LESS_THAN
LESS_THAN_OR_EQUAL
}
model WorkerLabel {
id BigInt @id @default(autoincrement()) @db.BigInt
createdAt DateTime @default(now())
updatedAt DateTime @default(now()) @updatedAt
worker Worker @relation(fields: [workerId], references: [id], onDelete: Cascade, onUpdate: Cascade)
workerId String @db.Uuid
key String
strValue String?
intValue Int?
@@unique([workerId, key])
@@index([workerId])
}
enum WorkerType {
WEBHOOK
MANAGED
SELFHOSTED
}
enum WorkerSDKS {
UNKNOWN
GO
PYTHON
TYPESCRIPT
}
model Worker {
// base fields
id String @id @unique @default(uuid()) @db.Uuid
createdAt DateTime @default(now())
updatedAt DateTime @default(now()) @updatedAt
deletedAt DateTime?
type WorkerType @default(SELFHOSTED)
labels WorkerLabel[]
// the parent tenant
tenantId String @db.Uuid
// the last heartbeat time
lastHeartbeatAt DateTime?
// whether the worker has been marked as paused
isPaused Boolean @default(false)
// whether this worker GRPC connection is active or not
isActive Boolean @default(false)
lastListenerEstablished DateTime?
// the worker name
name String
// the dispatcher the worker is connected to
dispatcher Dispatcher? @relation(fields: [dispatcherId], references: [id], onDelete: SetNull, onUpdate: Cascade)
dispatcherId String? @db.Uuid
maxRuns Int @default(100)
services Service[]
// the actions this worker can run
actions Action[]
// the jobs the worker has run
stepRuns StepRun[]
// the runs which retrieve the group keys
groupKeyRuns GetGroupKeyRun[]
assignedEvents WorkerAssignEvent[]
webhook WebhookWorker? @relation(fields: [webhookId], references: [id], onDelete: SetNull, onUpdate: Cascade)
webhookId String? @unique @db.Uuid
sdkVersion String?
language WorkerSDKS?
languageVersion String?
os String?
runtimeExtra String?
}
model WorkerAssignEvent {
id BigInt @id @default(autoincrement()) @db.BigInt
// the parent worker
worker Worker @relation(fields: [workerId], references: [id], onDelete: Cascade, onUpdate: Cascade)
workerId String @db.Uuid
assignedStepRuns Json?
@@index([workerId, id(order: Desc)])
}
model Service {
// base fields
id String @id @unique @default(uuid()) @db.Uuid
createdAt DateTime @default(now())
updatedAt DateTime @default(now()) @updatedAt
deletedAt DateTime?
// the service name
name String
// the service description
description String?
tenant Tenant @relation(fields: [tenantId], references: [id], onDelete: Cascade, onUpdate: Cascade)
tenantId String @db.Uuid
// the service's workers
workers Worker[]
@@unique([tenantId, name])
}
enum VcsProvider {
GITHUB
}
model TenantVcsProvider {
// base fields
id String @id @unique @default(uuid()) @db.Uuid
createdAt DateTime @default(now())
updatedAt DateTime @default(now()) @updatedAt
deletedAt DateTime?
// the parent tenant
tenant Tenant @relation(fields: [tenantId], references: [id], onDelete: Cascade, onUpdate: Cascade)
tenantId String @db.Uuid
vcsProvider VcsProvider
// the provider's configuration
config Json?
@@unique([tenantId, vcsProvider])
}
model TenantAlertEmailGroup {
// base fields
id String @id @unique @default(uuid()) @db.Uuid
createdAt DateTime @default(now())
updatedAt DateTime @default(now()) @updatedAt
deletedAt DateTime?
// the parent tenant
tenant Tenant @relation(fields: [tenantId], references: [id], onDelete: Cascade, onUpdate: Cascade)
tenantId String @db.Uuid
// a list of comma-separated emails for the group
emails String
}
model SlackAppWebhook {
// base fields
id String @id @unique @default(uuid()) @db.Uuid
createdAt DateTime @default(now())
updatedAt DateTime @default(now()) @updatedAt
deletedAt DateTime?
tenant Tenant @relation(fields: [tenantId], references: [id], onDelete: Cascade, onUpdate: Cascade)
tenantId String @db.Uuid
teamId String
teamName String
channelId String
channelName String
// encrypted at rest
webhookURL Bytes @db.ByteA
@@unique([tenantId, teamId, channelId])
}
enum LogLineLevel {
DEBUG
INFO
WARN
ERROR
}
model LogLine {
// base fields
id BigInt @id @default(autoincrement()) @db.BigInt
createdAt DateTime @default(now())
// the parent tenant
tenantId String @db.Uuid
// the step run id this log is associated with
stepRun StepRun? @relation(fields: [stepRunId], references: [id], onDelete: SetNull, onUpdate: Cascade)
stepRunId String? @db.Uuid
// the log line message
message String
// the log line level
level LogLineLevel @default(INFO)
// (optional) the log line metadata
metadata Json?
}
model StreamEvent {
// base fields
id BigInt @id @default(autoincrement()) @db.BigInt
createdAt DateTime @default(now())
// the parent tenant
tenantId String @db.Uuid
// the step run id this stream event is associated with
stepRun StepRun? @relation(fields: [stepRunId], references: [id], onDelete: SetNull, onUpdate: Cascade)
stepRunId String? @db.Uuid
// the stream event bytes
message Bytes
// (optional) the stream event metadata
metadata Json?
}
model SNSIntegration {
// base fields
id String @id @unique @default(uuid()) @db.Uuid
createdAt DateTime @default(now())
updatedAt DateTime @default(now()) @updatedAt
// the parent tenant
tenant Tenant @relation(fields: [tenantId], references: [id], onDelete: Cascade, onUpdate: Cascade)
tenantId String @db.Uuid
// the sns topic arn
topicArn String
@@unique([tenantId, topicArn])
}
model SecurityCheckIdent {
id String @id @unique @default(uuid()) @db.Uuid
}
enum LeaseKind {
WORKER
QUEUE
}
model Lease {
id BigInt @id @default(autoincrement()) @db.BigInt
// when the lease on the resource expires
expiresAt DateTime?
// the parent tenant
tenantId String @db.Uuid
// the resource being leased
resourceId String
// the kind of lease
kind LeaseKind
@@unique([tenantId, kind, resourceId])
}