refactor: remove foreign keys from unchanged/non-cascading parent tables (#918)

* refactor: remove fks from unchanged/non-cascading parent tables

* fix: cleanup cache for engine repository

* fix: remove streamevent
This commit is contained in:
abelanger5
2024-09-27 14:21:45 -04:00
committed by GitHub
parent 925b2654c8
commit 6172956bbd
13 changed files with 370 additions and 32967 deletions
-30
View File
@@ -63,36 +63,6 @@ func run(done chan<- string, job worker.WorkflowJob) (func() error, error) {
}
defer client.Disconnect()
// TODO check for the database status
events, err := client.Event.FindMany(
db.Event.TenantID.Equals(c.TenantId()),
db.Event.Key.Equals("user:create:timeout"),
).With(
db.Event.WorkflowRuns.Fetch().With(
db.WorkflowRunTriggeredBy.Parent.Fetch().With(
db.WorkflowRun.JobRuns.Fetch().With(
db.JobRun.StepRuns.Fetch(),
),
),
),
).Exec(context.Background())
if err != nil {
panic(fmt.Errorf("error finding events: %w", err))
}
for _, event := range events {
for _, workflowRun := range event.WorkflowRuns() {
for _, jobRuns := range workflowRun.Parent().JobRuns() {
for _, stepRun := range jobRuns.StepRuns() {
if stepRun.Status != db.StepRunStatusFailed {
panic(fmt.Errorf("expected step run to be failed, got %s", stepRun.Status))
}
}
}
}
}
done <- "done"
}()
-14
View File
@@ -115,12 +115,6 @@ func TestWebhook(t *testing.T) {
"webhook-step-one",
"webhook-step-two",
}, items)
verifyStepRuns(prisma, event, c.TenantId(), db.JobRunStatusSucceeded, db.StepRunStatusSucceeded, func(output string) {
if string(output) != `{"message":"hi from webhook-step-one"}` && string(output) != `{"message":"hi from webhook-step-two"}` {
panic(fmt.Errorf("expected step run output to be valid, got %s", output))
}
})
},
},
{
@@ -165,8 +159,6 @@ func TestWebhook(t *testing.T) {
if err != nil {
t.Fatalf("run() error = %s", err)
}
verifyStepRuns(prisma, event, c.TenantId(), db.JobRunStatusFailed, db.StepRunStatusFailed, nil)
},
},
{
@@ -267,12 +259,6 @@ func TestWebhook(t *testing.T) {
"wha-webhook-step-two",
"wha-webhook-action-1",
}, items)
verifyStepRuns(prisma, event, c.TenantId(), db.JobRunStatusSucceeded, db.StepRunStatusSucceeded, func(output string) {
if string(output) != `{"message":"hi from wha-webhook-step-one"}` && string(output) != `{"message":"hi from wha-webhook-step-two"}` && string(output) != `{"message":"hi from wha-webhook-action-1"}` {
panic(fmt.Errorf("expected step run output to be valid, got %s", output))
}
})
},
},
}
-55
View File
@@ -9,7 +9,6 @@ import (
"time"
"github.com/hatchet-dev/hatchet/pkg/client"
"github.com/hatchet-dev/hatchet/pkg/repository/prisma/db"
"github.com/hatchet-dev/hatchet/pkg/worker"
)
@@ -78,61 +77,7 @@ func run(
return fmt.Errorf("error pushing event: %w", err)
}
// TODO test for assigned status before it is started
//time.Sleep(2 * time.Second)
//verifyStepRuns(client, c.TenantId(), db.JobRunStatusRunning, db.StepRunStatusAssigned, nil)
time.Sleep(5 * time.Second)
return nil
}
func verifyStepRuns(prisma *db.PrismaClient, event string, tenantId string, jobRunStatus db.JobRunStatus, stepRunStatus db.StepRunStatus, check func(string)) {
events, err := prisma.Event.FindMany(
db.Event.TenantID.Equals(tenantId),
db.Event.Key.Equals(event),
).With(
db.Event.WorkflowRuns.Fetch().With(
db.WorkflowRunTriggeredBy.Parent.Fetch().With(
db.WorkflowRun.JobRuns.Fetch().With(
db.JobRun.StepRuns.Fetch(),
),
),
),
).Exec(context.Background())
if err != nil {
panic(fmt.Errorf("error finding events: %w", err))
}
if len(events) == 0 {
panic(fmt.Errorf("no events found"))
}
for _, event := range events {
if len(event.WorkflowRuns()) == 0 {
panic(fmt.Errorf("no workflow runs found"))
}
for _, workflowRun := range event.WorkflowRuns() {
if len(workflowRun.Parent().JobRuns()) == 0 {
panic(fmt.Errorf("no job runs found"))
}
for _, jobRuns := range workflowRun.Parent().JobRuns() {
if jobRuns.Status != jobRunStatus {
panic(fmt.Errorf("expected job run to be %s, got %s", jobRunStatus, jobRuns.Status))
}
for _, stepRun := range jobRuns.StepRuns() {
if stepRun.Status != stepRunStatus {
panic(fmt.Errorf("expected step run to be %s, got %s", stepRunStatus, stepRun.Status))
}
output, ok := stepRun.Output()
if check != nil {
if !ok {
panic(fmt.Errorf("expected step run to have output, got %+v", stepRun))
}
check(string(output))
}
}
}
}
}
}
+7 -1
View File
@@ -206,8 +206,14 @@ func GetDatabaseConfigFromConfigFile(cf *database.ConfigFile, runtime *server.Co
meter := metered.NewMetered(entitlementRepo, &l)
cleanupEngine, engineRepo := prisma.NewEngineRepository(pool, queuePool, runtime, prisma.WithLogger(&l), prisma.WithCache(ch), prisma.WithMetered(meter))
return &database.Config{
Disconnect: func() error {
if err := cleanupEngine(); err != nil {
return err
}
ch.Stop()
meter.Stop()
return c.Prisma.Disconnect()
@@ -215,7 +221,7 @@ func GetDatabaseConfigFromConfigFile(cf *database.ConfigFile, runtime *server.Co
Pool: pool,
QueuePool: queuePool,
APIRepository: prisma.NewAPIRepository(c, pool, prisma.WithLogger(&l), prisma.WithCache(ch), prisma.WithMetered(meter)),
EngineRepository: prisma.NewEngineRepository(pool, queuePool, runtime, prisma.WithLogger(&l), prisma.WithCache(ch), prisma.WithMetered(meter)),
EngineRepository: engineRepo,
EntitlementRepository: entitlementRepo,
Seed: cf.Seed,
}, nil
File diff suppressed because it is too large Load Diff
+3 -63
View File
@@ -1107,6 +1107,9 @@ CREATE INDEX "StepRunEvent_stepRunId_idx" ON "StepRunEvent"("stepRunId" ASC);
-- CreateIndex
CREATE INDEX "StepRunEvent_workflowRunId_idx" ON "StepRunEvent"("workflowRunId" ASC);
-- CreateIndex
CREATE INDEX "StepRunExpressionEval_stepRunId_idx" ON "StepRunExpressionEval"("stepRunId" ASC);
-- CreateIndex
CREATE UNIQUE INDEX "StepRunResultArchive_id_key" ON "StepRunResultArchive"("id" ASC);
@@ -1362,12 +1365,6 @@ ALTER TABLE "Action" ADD CONSTRAINT "Action_tenantId_fkey" FOREIGN KEY ("tenantI
-- AddForeignKey
ALTER TABLE "Event" ADD CONSTRAINT "Event_replayedFromId_fkey" FOREIGN KEY ("replayedFromId") REFERENCES "Event"("id") ON DELETE SET NULL ON UPDATE CASCADE;
-- AddForeignKey
ALTER TABLE "Event" ADD CONSTRAINT "Event_tenantId_fkey" FOREIGN KEY ("tenantId") REFERENCES "Tenant"("id") ON DELETE CASCADE ON UPDATE CASCADE;
-- AddForeignKey
ALTER TABLE "GetGroupKeyRun" ADD CONSTRAINT "GetGroupKeyRun_tenantId_fkey" FOREIGN KEY ("tenantId") REFERENCES "Tenant"("id") ON DELETE CASCADE ON UPDATE CASCADE;
-- AddForeignKey
ALTER TABLE "GetGroupKeyRun" ADD CONSTRAINT "GetGroupKeyRun_tickerId_fkey" FOREIGN KEY ("tickerId") REFERENCES "Ticker"("id") ON DELETE SET NULL ON UPDATE CASCADE;
@@ -1377,21 +1374,12 @@ ALTER TABLE "GetGroupKeyRun" ADD CONSTRAINT "GetGroupKeyRun_workerId_fkey" FOREI
-- AddForeignKey
ALTER TABLE "GetGroupKeyRun" ADD CONSTRAINT "GetGroupKeyRun_workflowRunId_fkey" FOREIGN KEY ("workflowRunId") REFERENCES "WorkflowRun"("id") ON DELETE CASCADE ON UPDATE CASCADE;
-- AddForeignKey
ALTER TABLE "Job" ADD CONSTRAINT "Job_tenantId_fkey" FOREIGN KEY ("tenantId") REFERENCES "Tenant"("id") ON DELETE CASCADE ON UPDATE CASCADE;
-- AddForeignKey
ALTER TABLE "Job" ADD CONSTRAINT "Job_workflowVersionId_fkey" FOREIGN KEY ("workflowVersionId") REFERENCES "WorkflowVersion"("id") ON DELETE CASCADE ON UPDATE CASCADE;
-- AddForeignKey
ALTER TABLE "JobRun" ADD CONSTRAINT "JobRun_jobId_fkey" FOREIGN KEY ("jobId") REFERENCES "Job"("id") ON DELETE CASCADE ON UPDATE CASCADE;
-- AddForeignKey
ALTER TABLE "JobRun" ADD CONSTRAINT "JobRun_tenantId_fkey" FOREIGN KEY ("tenantId") REFERENCES "Tenant"("id") ON DELETE CASCADE ON UPDATE CASCADE;
-- AddForeignKey
ALTER TABLE "JobRun" ADD CONSTRAINT "JobRun_tickerId_fkey" FOREIGN KEY ("tickerId") REFERENCES "Ticker"("id") ON DELETE SET NULL ON UPDATE CASCADE;
-- AddForeignKey
ALTER TABLE "JobRun" ADD CONSTRAINT "JobRun_workflowRunId_fkey" FOREIGN KEY ("workflowRunId") REFERENCES "WorkflowRun"("id") ON DELETE CASCADE ON UPDATE CASCADE;
@@ -1404,12 +1392,6 @@ ALTER TABLE "JobRunLookupData" ADD CONSTRAINT "JobRunLookupData_tenantId_fkey" F
-- AddForeignKey
ALTER TABLE "LogLine" ADD CONSTRAINT "LogLine_stepRunId_fkey" FOREIGN KEY ("stepRunId") REFERENCES "StepRun"("id") ON DELETE SET NULL ON UPDATE CASCADE;
-- AddForeignKey
ALTER TABLE "LogLine" ADD CONSTRAINT "LogLine_tenantId_fkey" FOREIGN KEY ("tenantId") REFERENCES "Tenant"("id") ON DELETE CASCADE ON UPDATE CASCADE;
-- AddForeignKey
ALTER TABLE "RateLimit" ADD CONSTRAINT "RateLimit_tenantId_fkey" FOREIGN KEY ("tenantId") REFERENCES "Tenant"("id") ON DELETE CASCADE ON UPDATE CASCADE;
-- AddForeignKey
ALTER TABLE "SNSIntegration" ADD CONSTRAINT "SNSIntegration_tenantId_fkey" FOREIGN KEY ("tenantId") REFERENCES "Tenant"("id") ON DELETE CASCADE ON UPDATE CASCADE;
@@ -1425,30 +1407,15 @@ ALTER TABLE "Step" ADD CONSTRAINT "Step_actionId_tenantId_fkey" FOREIGN KEY ("ac
-- AddForeignKey
ALTER TABLE "Step" ADD CONSTRAINT "Step_jobId_fkey" FOREIGN KEY ("jobId") REFERENCES "Job"("id") ON DELETE CASCADE ON UPDATE CASCADE;
-- AddForeignKey
ALTER TABLE "Step" ADD CONSTRAINT "Step_tenantId_fkey" FOREIGN KEY ("tenantId") REFERENCES "Tenant"("id") ON DELETE CASCADE ON UPDATE CASCADE;
-- AddForeignKey
ALTER TABLE "StepDesiredWorkerLabel" ADD CONSTRAINT "StepDesiredWorkerLabel_stepId_fkey" FOREIGN KEY ("stepId") REFERENCES "Step"("id") ON DELETE CASCADE ON UPDATE CASCADE;
-- AddForeignKey
ALTER TABLE "StepRateLimit" ADD CONSTRAINT "StepRateLimit_tenantId_fkey" FOREIGN KEY ("tenantId") REFERENCES "Tenant"("id") ON DELETE CASCADE ON UPDATE CASCADE;
-- AddForeignKey
ALTER TABLE "StepRateLimit" ADD CONSTRAINT "StepRateLimit_tenantId_rateLimitKey_fkey" FOREIGN KEY ("tenantId", "rateLimitKey") REFERENCES "RateLimit"("tenantId", "key") ON DELETE RESTRICT ON UPDATE CASCADE;
-- AddForeignKey
ALTER TABLE "StepRun" ADD CONSTRAINT "StepRun_jobRunId_fkey" FOREIGN KEY ("jobRunId") REFERENCES "JobRun"("id") ON DELETE CASCADE ON UPDATE CASCADE;
-- AddForeignKey
ALTER TABLE "StepRun" ADD CONSTRAINT "StepRun_stepId_fkey" FOREIGN KEY ("stepId") REFERENCES "Step"("id") ON DELETE CASCADE ON UPDATE CASCADE;
-- AddForeignKey
ALTER TABLE "StepRun" ADD CONSTRAINT "StepRun_tenantId_fkey" FOREIGN KEY ("tenantId") REFERENCES "Tenant"("id") ON DELETE CASCADE ON UPDATE CASCADE;
-- AddForeignKey
ALTER TABLE "StepRun" ADD CONSTRAINT "StepRun_tickerId_fkey" FOREIGN KEY ("tickerId") REFERENCES "Ticker"("id") ON DELETE SET NULL ON UPDATE CASCADE;
-- AddForeignKey
ALTER TABLE "StepRun" ADD CONSTRAINT "StepRun_workerId_fkey" FOREIGN KEY ("workerId") REFERENCES "Worker"("id") ON DELETE SET NULL ON UPDATE CASCADE;
@@ -1458,9 +1425,6 @@ ALTER TABLE "StepRunResultArchive" ADD CONSTRAINT "StepRunResultArchive_stepRunI
-- AddForeignKey
ALTER TABLE "StreamEvent" ADD CONSTRAINT "StreamEvent_stepRunId_fkey" FOREIGN KEY ("stepRunId") REFERENCES "StepRun"("id") ON DELETE SET NULL ON UPDATE CASCADE;
-- AddForeignKey
ALTER TABLE "StreamEvent" ADD CONSTRAINT "StreamEvent_tenantId_fkey" FOREIGN KEY ("tenantId") REFERENCES "Tenant"("id") ON DELETE CASCADE ON UPDATE CASCADE;
-- AddForeignKey
ALTER TABLE "Tenant" ADD CONSTRAINT "Tenant_controllerPartitionId_fkey" FOREIGN KEY ("controllerPartitionId") REFERENCES "ControllerPartition"("id") ON DELETE SET NULL ON UPDATE SET NULL;
@@ -1524,9 +1488,6 @@ ALTER TABLE "WebhookWorkerWorkflow" ADD CONSTRAINT "WebhookWorkerWorkflow_workfl
-- AddForeignKey
ALTER TABLE "Worker" ADD CONSTRAINT "Worker_dispatcherId_fkey" FOREIGN KEY ("dispatcherId") REFERENCES "Dispatcher"("id") ON DELETE SET NULL ON UPDATE CASCADE;
-- AddForeignKey
ALTER TABLE "Worker" ADD CONSTRAINT "Worker_tenantId_fkey" FOREIGN KEY ("tenantId") REFERENCES "Tenant"("id") ON DELETE CASCADE ON UPDATE CASCADE;
-- AddForeignKey
ALTER TABLE "Worker" ADD CONSTRAINT "Worker_webhookId_fkey" FOREIGN KEY ("webhookId") REFERENCES "WebhookWorker"("id") ON DELETE SET NULL ON UPDATE CASCADE;
@@ -1536,9 +1497,6 @@ ALTER TABLE "WorkerAssignEvent" ADD CONSTRAINT "WorkerAssignEvent_workerId_fkey"
-- AddForeignKey
ALTER TABLE "WorkerLabel" ADD CONSTRAINT "WorkerLabel_workerId_fkey" FOREIGN KEY ("workerId") REFERENCES "Worker"("id") ON DELETE CASCADE ON UPDATE CASCADE;
-- AddForeignKey
ALTER TABLE "Workflow" ADD CONSTRAINT "Workflow_tenantId_fkey" FOREIGN KEY ("tenantId") REFERENCES "Tenant"("id") ON DELETE CASCADE ON UPDATE CASCADE;
-- AddForeignKey
ALTER TABLE "WorkflowConcurrency" ADD CONSTRAINT "WorkflowConcurrency_getConcurrencyGroupId_fkey" FOREIGN KEY ("getConcurrencyGroupId") REFERENCES "Action"("id") ON DELETE SET NULL ON UPDATE CASCADE;
@@ -1551,33 +1509,15 @@ ALTER TABLE "WorkflowRun" ADD CONSTRAINT "WorkflowRun_parentId_fkey" FOREIGN KEY
-- AddForeignKey
ALTER TABLE "WorkflowRun" ADD CONSTRAINT "WorkflowRun_parentStepRunId_fkey" FOREIGN KEY ("parentStepRunId") REFERENCES "StepRun"("id") ON DELETE SET NULL ON UPDATE CASCADE;
-- AddForeignKey
ALTER TABLE "WorkflowRun" ADD CONSTRAINT "WorkflowRun_tenantId_fkey" FOREIGN KEY ("tenantId") REFERENCES "Tenant"("id") ON DELETE CASCADE ON UPDATE CASCADE;
-- AddForeignKey
ALTER TABLE "WorkflowRun" ADD CONSTRAINT "WorkflowRun_workflowVersionId_fkey" FOREIGN KEY ("workflowVersionId") REFERENCES "WorkflowVersion"("id") ON DELETE CASCADE ON UPDATE CASCADE;
-- AddForeignKey
ALTER TABLE "WorkflowRunDedupe" ADD CONSTRAINT "WorkflowRunDedupe_tenantId_fkey" FOREIGN KEY ("tenantId") REFERENCES "Tenant"("id") ON DELETE CASCADE ON UPDATE CASCADE;
-- AddForeignKey
ALTER TABLE "WorkflowRunStickyState" ADD CONSTRAINT "WorkflowRunStickyState_workflowRunId_fkey" FOREIGN KEY ("workflowRunId") REFERENCES "WorkflowRun"("id") ON DELETE CASCADE ON UPDATE CASCADE;
-- AddForeignKey
ALTER TABLE "WorkflowRunTriggeredBy" ADD CONSTRAINT "WorkflowRunTriggeredBy_cronParentId_cronSchedule_fkey" FOREIGN KEY ("cronParentId", "cronSchedule") REFERENCES "WorkflowTriggerCronRef"("parentId", "cron") ON DELETE SET NULL ON UPDATE CASCADE;
-- AddForeignKey
ALTER TABLE "WorkflowRunTriggeredBy" ADD CONSTRAINT "WorkflowRunTriggeredBy_eventId_fkey" FOREIGN KEY ("eventId") REFERENCES "Event"("id") ON DELETE SET NULL ON UPDATE CASCADE;
-- AddForeignKey
ALTER TABLE "WorkflowRunTriggeredBy" ADD CONSTRAINT "WorkflowRunTriggeredBy_parentId_fkey" FOREIGN KEY ("parentId") REFERENCES "WorkflowRun"("id") ON DELETE CASCADE ON UPDATE CASCADE;
-- AddForeignKey
ALTER TABLE "WorkflowRunTriggeredBy" ADD CONSTRAINT "WorkflowRunTriggeredBy_scheduledId_fkey" FOREIGN KEY ("scheduledId") REFERENCES "WorkflowTriggerScheduledRef"("id") ON DELETE SET NULL ON UPDATE CASCADE;
-- AddForeignKey
ALTER TABLE "WorkflowRunTriggeredBy" ADD CONSTRAINT "WorkflowRunTriggeredBy_tenantId_fkey" FOREIGN KEY ("tenantId") REFERENCES "Tenant"("id") ON DELETE CASCADE ON UPDATE CASCADE;
-- AddForeignKey
ALTER TABLE "WorkflowTag" ADD CONSTRAINT "WorkflowTag_tenantId_fkey" FOREIGN KEY ("tenantId") REFERENCES "Tenant"("id") ON DELETE CASCADE ON UPDATE CASCADE;
+26 -21
View File
@@ -281,7 +281,7 @@ func (r *engineRepository) WebhookWorker() repository.WebhookWorkerEngineReposit
return r.webhookWorker
}
func NewEngineRepository(pool *pgxpool.Pool, queuePool *pgxpool.Pool, cf *server.ConfigFileRuntime, fs ...PrismaRepositoryOpt) repository.EngineRepository {
func NewEngineRepository(pool *pgxpool.Pool, queuePool *pgxpool.Pool, cf *server.ConfigFileRuntime, fs ...PrismaRepositoryOpt) (func() error, repository.EngineRepository) {
opts := defaultPrismaRepositoryOpts()
for _, f := range fs {
@@ -295,26 +295,31 @@ func NewEngineRepository(pool *pgxpool.Pool, queuePool *pgxpool.Pool, cf *server
opts.cache = cache.New(1 * time.Millisecond)
}
return &engineRepository{
health: NewHealthEngineRepository(pool),
apiToken: NewEngineTokenRepository(pool, opts.v, opts.l, opts.cache),
dispatcher: NewDispatcherRepository(pool, opts.v, opts.l),
event: NewEventEngineRepository(pool, opts.v, opts.l, opts.metered),
getGroupKeyRun: NewGetGroupKeyRunRepository(pool, opts.v, opts.l),
jobRun: NewJobRunEngineRepository(pool, opts.v, opts.l),
stepRun: NewStepRunEngineRepository(queuePool, opts.v, opts.l, cf),
step: NewStepRepository(pool, opts.v, opts.l),
tenant: NewTenantEngineRepository(pool, opts.v, opts.l, opts.cache),
tenantAlerting: NewTenantAlertingEngineRepository(pool, opts.v, opts.l, opts.cache),
ticker: NewTickerRepository(pool, opts.v, opts.l),
worker: NewWorkerEngineRepository(pool, opts.v, opts.l, opts.metered),
workflow: NewWorkflowEngineRepository(pool, opts.v, opts.l, opts.metered),
workflowRun: NewWorkflowRunEngineRepository(pool, opts.v, opts.l, opts.metered),
streamEvent: NewStreamEventsEngineRepository(pool, opts.v, opts.l),
log: NewLogEngineRepository(pool, opts.v, opts.l),
rateLimit: NewRateLimitEngineRepository(pool, opts.v, opts.l),
webhookWorker: NewWebhookWorkerEngineRepository(pool, opts.v, opts.l),
}
rlCache := cache.New(5 * time.Minute)
return func() error {
rlCache.Stop()
return nil
}, &engineRepository{
health: NewHealthEngineRepository(pool),
apiToken: NewEngineTokenRepository(pool, opts.v, opts.l, opts.cache),
dispatcher: NewDispatcherRepository(pool, opts.v, opts.l),
event: NewEventEngineRepository(pool, opts.v, opts.l, opts.metered),
getGroupKeyRun: NewGetGroupKeyRunRepository(pool, opts.v, opts.l),
jobRun: NewJobRunEngineRepository(pool, opts.v, opts.l),
stepRun: NewStepRunEngineRepository(queuePool, opts.v, opts.l, cf, rlCache),
step: NewStepRepository(pool, opts.v, opts.l),
tenant: NewTenantEngineRepository(pool, opts.v, opts.l, opts.cache),
tenantAlerting: NewTenantAlertingEngineRepository(pool, opts.v, opts.l, opts.cache),
ticker: NewTickerRepository(pool, opts.v, opts.l),
worker: NewWorkerEngineRepository(pool, opts.v, opts.l, opts.metered),
workflow: NewWorkflowEngineRepository(pool, opts.v, opts.l, opts.metered),
workflowRun: NewWorkflowRunEngineRepository(pool, opts.v, opts.l, opts.metered),
streamEvent: NewStreamEventsEngineRepository(pool, opts.v, opts.l),
log: NewLogEngineRepository(pool, opts.v, opts.l),
rateLimit: NewRateLimitEngineRepository(pool, opts.v, opts.l),
webhookWorker: NewWebhookWorkerEngineRepository(pool, opts.v, opts.l),
}
}
type entitlementRepository struct {
+60 -20
View File
@@ -20,6 +20,7 @@ import (
"github.com/hatchet-dev/hatchet/internal/telemetry"
"github.com/hatchet-dev/hatchet/pkg/config/server"
"github.com/hatchet-dev/hatchet/pkg/repository"
"github.com/hatchet-dev/hatchet/pkg/repository/cache"
"github.com/hatchet-dev/hatchet/pkg/repository/prisma/db"
"github.com/hatchet-dev/hatchet/pkg/repository/prisma/dbsqlc"
"github.com/hatchet-dev/hatchet/pkg/repository/prisma/sqlchelpers"
@@ -255,24 +256,26 @@ func (s *stepRunAPIRepository) ListStepRunArchives(tenantId string, stepRunId st
}
type stepRunEngineRepository struct {
pool *pgxpool.Pool
v validator.Validator
l *zerolog.Logger
queries *dbsqlc.Queries
cf *server.ConfigFileRuntime
cachedMinQueuedIds sync.Map
callbacks []repository.Callback[*dbsqlc.ResolveWorkflowRunStatusRow]
pool *pgxpool.Pool
v validator.Validator
l *zerolog.Logger
queries *dbsqlc.Queries
cf *server.ConfigFileRuntime
cachedMinQueuedIds sync.Map
cachedStepIdHasRateLimit *cache.Cache
callbacks []repository.Callback[*dbsqlc.ResolveWorkflowRunStatusRow]
}
func NewStepRunEngineRepository(pool *pgxpool.Pool, v validator.Validator, l *zerolog.Logger, cf *server.ConfigFileRuntime) repository.StepRunEngineRepository {
func NewStepRunEngineRepository(pool *pgxpool.Pool, v validator.Validator, l *zerolog.Logger, cf *server.ConfigFileRuntime, rlCache *cache.Cache) repository.StepRunEngineRepository {
queries := dbsqlc.New()
return &stepRunEngineRepository{
pool: pool,
v: v,
l: l,
queries: queries,
cf: cf,
pool: pool,
v: v,
l: l,
queries: queries,
cf: cf,
cachedStepIdHasRateLimit: rlCache,
}
}
@@ -1313,6 +1316,7 @@ func (s *stepRunEngineRepository) QueueStepRuns(ctx context.Context, qlp *zerolo
func (s *stepRunEngineRepository) getStepRunRateLimits(ctx context.Context, dbtx dbsqlc.DBTX, tenantId string, queueItems []*scheduling.QueueItemWithOrder) (map[string]map[string]int32, map[string]*dbsqlc.ListRateLimitsForTenantWithMutateRow, error) {
stepRunIds := make([]pgtype.UUID, 0, len(queueItems))
stepIds := make([]pgtype.UUID, 0, len(queueItems))
stepsWithRateLimits := make(map[string]bool)
for _, item := range queueItems {
stepRunIds = append(stepRunIds, item.StepRunId)
@@ -1320,15 +1324,32 @@ func (s *stepRunEngineRepository) getStepRunRateLimits(ctx context.Context, dbtx
}
stepIdToStepRuns := make(map[string][]string)
stepRunIdToStepId := make(map[string]string)
for i, stepRunId := range stepRunIds {
stepId := sqlchelpers.UUIDToStr(stepIds[i])
stepRunIdStr := sqlchelpers.UUIDToStr(stepRunId)
if _, ok := stepIdToStepRuns[stepId]; !ok {
stepIdToStepRuns[stepId] = make([]string, 0)
}
stepIdToStepRuns[stepId] = append(stepIdToStepRuns[stepId], sqlchelpers.UUIDToStr(stepRunId))
stepIdToStepRuns[stepId] = append(stepIdToStepRuns[stepId], stepRunIdStr)
stepRunIdToStepId[stepRunIdStr] = stepId
}
// check if we have any rate limits for these step ids
skipRateLimiting := true
for stepIdStr := range stepIdToStepRuns {
if hasRateLimit, ok := s.cachedStepIdHasRateLimit.Get(stepIdStr); !ok || hasRateLimit.(bool) {
skipRateLimiting = false
break
}
}
if skipRateLimiting {
return nil, nil, nil
}
// get all step run expression evals which correspond to rate limits, grouped by step run id
@@ -1348,6 +1369,8 @@ func (s *stepRunEngineRepository) getStepRunRateLimits(ctx context.Context, dbtx
// Only append if this is a key expression. Note that we have a uniqueness constraint on
// the stepRunId, kind, and key, so we will not insert duplicate values into the array.
if eval.Kind == dbsqlc.StepExpressionKindDYNAMICRATELIMITKEY {
stepsWithRateLimits[stepRunIdToStepId[stepRunId]] = true
k := eval.ValueStr.String
if _, ok := stepRunToKeys[stepRunId]; !ok {
@@ -1465,17 +1488,27 @@ func (s *stepRunEngineRepository) getStepRunRateLimits(ctx context.Context, dbtx
errData,
)
// upsert all rate limits based on the keys, limit values, and durations
err = s.queries.UpsertRateLimitsBulk(ctx, dbtx, upsertRateLimitBulkParams)
var stepRateLimits []*dbsqlc.StepRateLimit
if err != nil {
return nil, nil, fmt.Errorf("could not bulk upsert dynamic rate limits: %w", err)
if len(upsertRateLimitBulkParams.Keys) > 0 {
// upsert all rate limits based on the keys, limit values, and durations
err = s.queries.UpsertRateLimitsBulk(ctx, dbtx, upsertRateLimitBulkParams)
if err != nil {
return nil, nil, fmt.Errorf("could not bulk upsert dynamic rate limits: %w", err)
}
}
// get all existing static rate limits for steps to the mapping, mapping back from step ids to step run ids
stepRateLimits, err := s.queries.ListRateLimitsForSteps(ctx, dbtx, dbsqlc.ListRateLimitsForStepsParams{
uniqueStepIds := make([]pgtype.UUID, 0, len(stepIdToStepRuns))
for stepId := range stepIdToStepRuns {
uniqueStepIds = append(uniqueStepIds, sqlchelpers.UUIDFromStr(stepId))
}
stepRateLimits, err = s.queries.ListRateLimitsForSteps(ctx, dbtx, dbsqlc.ListRateLimitsForStepsParams{
Tenantid: sqlchelpers.UUIDFromStr(tenantId),
Stepids: stepIds,
Stepids: uniqueStepIds,
})
if err != nil {
@@ -1483,6 +1516,7 @@ func (s *stepRunEngineRepository) getStepRunRateLimits(ctx context.Context, dbtx
}
for _, row := range stepRateLimits {
stepsWithRateLimits[sqlchelpers.UUIDToStr(row.StepId)] = true
stepId := sqlchelpers.UUIDToStr(row.StepId)
stepRuns := stepIdToStepRuns[stepId]
@@ -1507,6 +1541,12 @@ func (s *stepRunEngineRepository) getStepRunRateLimits(ctx context.Context, dbtx
mapRateLimitsForTenant[row.Key] = row
}
// store all step ids in the cache, so we can skip rate limiting for steps without rate limits
for stepId := range stepIdToStepRuns {
hasRateLimit := stepsWithRateLimits[stepId]
s.cachedStepIdHasRateLimit.Set(stepId, hasRateLimit)
}
return stepRunToKeyToUnits, mapRateLimitsForTenant, nil
}
@@ -0,0 +1,65 @@
-- DropForeignKey
ALTER TABLE "Event" DROP CONSTRAINT "Event_tenantId_fkey";
-- DropForeignKey
ALTER TABLE "GetGroupKeyRun" DROP CONSTRAINT "GetGroupKeyRun_tenantId_fkey";
-- DropForeignKey
ALTER TABLE "Job" DROP CONSTRAINT "Job_tenantId_fkey";
-- DropForeignKey
ALTER TABLE "JobRun" DROP CONSTRAINT "JobRun_tenantId_fkey";
-- DropForeignKey
ALTER TABLE "JobRun" DROP CONSTRAINT "JobRun_tickerId_fkey";
-- DropForeignKey
ALTER TABLE "LogLine" DROP CONSTRAINT "LogLine_tenantId_fkey";
-- DropForeignKey
ALTER TABLE "RateLimit" DROP CONSTRAINT "RateLimit_tenantId_fkey";
-- DropForeignKey
ALTER TABLE "Step" DROP CONSTRAINT "Step_tenantId_fkey";
-- DropForeignKey
ALTER TABLE "StepRateLimit" DROP CONSTRAINT "StepRateLimit_tenantId_fkey";
-- DropForeignKey
ALTER TABLE "StepRun" DROP CONSTRAINT "StepRun_stepId_fkey";
-- DropForeignKey
ALTER TABLE "StepRun" DROP CONSTRAINT "StepRun_tenantId_fkey";
-- DropForeignKey
ALTER TABLE "StepRun" DROP CONSTRAINT "StepRun_tickerId_fkey";
-- DropForeignKey
ALTER TABLE "StreamEvent" DROP CONSTRAINT "StreamEvent_tenantId_fkey";
-- DropForeignKey
ALTER TABLE "Worker" DROP CONSTRAINT "Worker_tenantId_fkey";
-- DropForeignKey
ALTER TABLE "Workflow" DROP CONSTRAINT "Workflow_tenantId_fkey";
-- DropForeignKey
ALTER TABLE "WorkflowRun" DROP CONSTRAINT "WorkflowRun_tenantId_fkey";
-- DropForeignKey
ALTER TABLE "WorkflowRun" DROP CONSTRAINT "WorkflowRun_workflowVersionId_fkey";
-- DropForeignKey
ALTER TABLE "WorkflowRunDedupe" DROP CONSTRAINT "WorkflowRunDedupe_tenantId_fkey";
-- DropForeignKey
ALTER TABLE "WorkflowRunTriggeredBy" DROP CONSTRAINT "WorkflowRunTriggeredBy_eventId_fkey";
-- DropForeignKey
ALTER TABLE "WorkflowRunTriggeredBy" DROP CONSTRAINT "WorkflowRunTriggeredBy_parentId_fkey";
-- DropForeignKey
ALTER TABLE "WorkflowRunTriggeredBy" DROP CONSTRAINT "WorkflowRunTriggeredBy_tenantId_fkey";
-- CreateIndex
CREATE INDEX "StepRunExpressionEval_stepRunId_idx" ON "StepRunExpressionEval"("stepRunId");
+21 -69
View File
@@ -205,39 +205,24 @@ model Tenant {
// The data retention period for deletable resources. This is a Go duration string.
dataRetentionPeriod String @default("720h")
events Event[]
workflows Workflow[]
jobs Job[]
steps Step[]
triggers WorkflowTriggers[]
workflowRuns WorkflowRun[]
workflowRunTriggers WorkflowRunTriggeredBy[]
jobRuns JobRun[]
jobRunLookupDatas JobRunLookupData[]
stepRuns StepRun[]
workers Worker[]
members TenantMember[]
workflowTags WorkflowTag[]
actions Action[]
services Service[]
invites TenantInviteLink[]
apiTokens APIToken[]
groupKeyRuns GetGroupKeyRun[]
vcsProviders TenantVcsProvider[]
streamEvents StreamEvent[]
logs LogLine[]
snsIntegrations SNSIntegration[]
rateLimits RateLimit[]
stepRateLimits StepRateLimit[]
alertEmailGroups TenantAlertEmailGroup[]
triggers WorkflowTriggers[]
jobRunLookupDatas JobRunLookupData[]
members TenantMember[]
workflowTags WorkflowTag[]
actions Action[]
services Service[]
invites TenantInviteLink[]
apiTokens APIToken[]
vcsProviders TenantVcsProvider[]
snsIntegrations SNSIntegration[]
alertEmailGroups TenantAlertEmailGroup[]
// alertMemberEmails controls whether to send alert emails to tenant members in addition to the alert email groups
alertMemberEmails Boolean @default(true)
slackWebhooks SlackAppWebhook[]
alertingSettings TenantAlertingSettings?
limits TenantResourceLimit[]
limitAlerts TenantResourceLimitAlert[]
webhookWorkers WebhookWorker[]
dedupes WorkflowRunDedupe[]
alertMemberEmails Boolean @default(true)
slackWebhooks SlackAppWebhook[]
alertingSettings TenantAlertingSettings?
limits TenantResourceLimit[]
limitAlerts TenantResourceLimitAlert[]
webhookWorkers WebhookWorker[]
@@index([controllerPartitionId])
@@index([workerPartitionId])
@@ -425,7 +410,6 @@ model Event {
key String
// the parent tenant
tenant Tenant @relation(fields: [tenantId], references: [id], onDelete: Cascade, onUpdate: Cascade)
tenantId String @db.Uuid
// the event which was replayed
@@ -441,9 +425,6 @@ model Event {
// metadata stored in the event
additionalMetadata Json?
// the workflow runs that were triggered by this event
workflowRuns WorkflowRunTriggeredBy[]
@@index([tenantId])
@@index([createdAt])
@@index([tenantId, createdAt])
@@ -480,7 +461,6 @@ model Workflow {
deletedAt DateTime?
// the parent tenant
tenant Tenant @relation(fields: [tenantId], references: [id], onDelete: Cascade, onUpdate: Cascade)
tenantId String @db.Uuid
// the workflow name
@@ -545,9 +525,6 @@ model WorkflowVersion {
onFailureJob Job? @relation("OnFailureJob", fields: [onFailureJobId], references: [id])
onFailureJobId String? @unique @db.Uuid
// all runs for the workflow
runs WorkflowRun[]
// the scheduled runs for the workflow
scheduled WorkflowTriggerScheduledRef[]
@@ -711,7 +688,6 @@ model Job {
deletedAt DateTime?
// the parent tenant
tenant Tenant @relation(fields: [tenantId], references: [id], onDelete: Cascade, onUpdate: Cascade)
tenantId String @db.Uuid
// the parent workflow
@@ -800,7 +776,6 @@ model Step {
readableId String?
// the parent tenant
tenant Tenant @relation(fields: [tenantId], references: [id], onDelete: Cascade, onUpdate: Cascade)
tenantId String @db.Uuid
// the parent job
@@ -824,9 +799,6 @@ model Step {
// a list of dependencies for this step
parents Step[] @relation("StepOrder")
// a list of runs for this step
stepRuns StepRun[]
// the default amount of time to wait while scheduling a step run
scheduleTimeout String @default("5m")
@@ -854,7 +826,6 @@ model StepRateLimit {
kind StepRateLimitKind @default(STATIC)
tenant Tenant @relation(fields: [tenantId], references: [id], onDelete: Cascade, onUpdate: Cascade)
tenantId String @db.Uuid
@@unique([stepId, rateLimitKey])
@@ -862,7 +833,6 @@ model StepRateLimit {
model RateLimit {
// the parent tenant
tenant Tenant @relation(fields: [tenantId], references: [id], onDelete: Cascade, onUpdate: Cascade)
tenantId String @db.Uuid
// the rate limit key
@@ -921,12 +891,10 @@ model WorkflowRun {
displayName String?
// the parent tenant
tenant Tenant @relation(fields: [tenantId], references: [id], onDelete: Cascade, onUpdate: Cascade)
tenantId String @db.Uuid
// the parent workflow
workflowVersion WorkflowVersion @relation(fields: [workflowVersionId], references: [id], onDelete: Cascade, onUpdate: Cascade)
workflowVersionId String @db.Uuid
workflowVersionId String @db.Uuid
concurrencyGroupId String?
@@ -936,8 +904,6 @@ model WorkflowRun {
jobRuns JobRun[]
triggeredBy WorkflowRunTriggeredBy?
sticky WorkflowRunStickyState?
// the run error
@@ -991,7 +957,6 @@ model WorkflowRunDedupe {
updatedAt DateTime @default(now()) @updatedAt
// the parent tenant
tenant Tenant @relation(fields: [tenantId], references: [id], onDelete: Cascade, onUpdate: Cascade)
tenantId String @db.Uuid
// the parent workflow
@@ -1016,7 +981,6 @@ model GetGroupKeyRun {
deletedAt DateTime? // TODO verify we're setting this
// the parent tenant
tenant Tenant @relation(fields: [tenantId], references: [id], onDelete: Cascade, onUpdate: Cascade)
tenantId String @db.Uuid
// the parent workflow run
@@ -1083,19 +1047,16 @@ model WorkflowRunTriggeredBy {
updatedAt DateTime @default(now()) @updatedAt
deletedAt DateTime?
// the tenant (needed for unique constraint)
tenant Tenant @relation(fields: [tenantId], references: [id], onDelete: Cascade, onUpdate: Cascade)
// the tenant
tenantId String @db.Uuid
// the parent workflow run
parent WorkflowRun @relation(fields: [parentId], references: [id], onDelete: Cascade, onUpdate: Cascade)
parentId String @unique @db.Uuid
parentId String @unique @db.Uuid
// the input if this was triggered manually
input Json?
// the parent event
event Event? @relation(fields: [eventId], references: [id])
eventId String? @db.Uuid
// the cron reference that triggered this workflow
@@ -1128,7 +1089,6 @@ model JobRun {
deletedAt DateTime?
// the parent tenant
tenant Tenant @relation(fields: [tenantId], references: [id], onDelete: Cascade, onUpdate: Cascade)
tenantId String @db.Uuid
// the parent workflow run
@@ -1140,7 +1100,6 @@ model JobRun {
jobId String @db.Uuid
// the assigned ticker
ticker Ticker? @relation(fields: [tickerId], references: [id])
tickerId String? @db.Uuid
stepRuns StepRun[]
@@ -1239,6 +1198,7 @@ model StepRunExpressionEval {
kind StepExpressionKind
@@id([key, stepRunId, kind])
@@index([stepRunId])
}
model StepRun {
@@ -1249,7 +1209,6 @@ model StepRun {
deletedAt DateTime?
// the parent tenant
tenant Tenant @relation(fields: [tenantId], references: [id], onDelete: Cascade, onUpdate: Cascade)
tenantId String @db.Uuid
// the parent job run
@@ -1257,7 +1216,6 @@ model StepRun {
jobRunId String @db.Uuid
// the parent step
step Step @relation(fields: [stepId], references: [id], onDelete: Cascade, onUpdate: Cascade)
stepId String @db.Uuid
// a list of dependents for this step
@@ -1276,7 +1234,6 @@ model StepRun {
workerId String? @db.Uuid
// the assigned ticker
ticker Ticker? @relation(fields: [tickerId], references: [id])
tickerId String? @db.Uuid
// the run status
@@ -1568,8 +1525,6 @@ model Ticker {
// whether this ticker is active or not
isActive Boolean @default(true)
jobRuns JobRun[]
stepRuns StepRun[]
crons WorkflowTriggerCronRef[]
scheduled WorkflowTriggerScheduledRef[]
groupKeyRuns GetGroupKeyRun[]
@@ -1620,7 +1575,6 @@ model Worker {
labels WorkerLabel[]
// the parent tenant
tenant Tenant @relation(fields: [tenantId], references: [id], onDelete: Cascade, onUpdate: Cascade)
tenantId String @db.Uuid
// the last heartbeat time
@@ -1767,7 +1721,6 @@ model LogLine {
createdAt DateTime @default(now())
// the parent tenant
tenant Tenant @relation(fields: [tenantId], references: [id], onDelete: Cascade, onUpdate: Cascade)
tenantId String @db.Uuid
// the step run id this log is associated with
@@ -1790,7 +1743,6 @@ model StreamEvent {
createdAt DateTime @default(now())
// the parent tenant
tenant Tenant @relation(fields: [tenantId], references: [id], onDelete: Cascade, onUpdate: Cascade)
tenantId String @db.Uuid
// the step run id this stream event is associated with
+32
View File
@@ -0,0 +1,32 @@
-- Modify "Event" table
ALTER TABLE "Event" DROP CONSTRAINT "Event_tenantId_fkey";
-- Modify "GetGroupKeyRun" table
ALTER TABLE "GetGroupKeyRun" DROP CONSTRAINT "GetGroupKeyRun_tenantId_fkey";
-- Modify "Job" table
ALTER TABLE "Job" DROP CONSTRAINT "Job_tenantId_fkey";
-- Modify "JobRun" table
ALTER TABLE "JobRun" DROP CONSTRAINT "JobRun_tenantId_fkey", DROP CONSTRAINT "JobRun_tickerId_fkey";
-- Modify "LogLine" table
ALTER TABLE "LogLine" DROP CONSTRAINT "LogLine_tenantId_fkey";
-- Modify "RateLimit" table
ALTER TABLE "RateLimit" DROP CONSTRAINT "RateLimit_tenantId_fkey";
-- Modify "Step" table
ALTER TABLE "Step" DROP CONSTRAINT "Step_tenantId_fkey";
-- Modify "StepRateLimit" table
ALTER TABLE "StepRateLimit" DROP CONSTRAINT "StepRateLimit_tenantId_fkey";
-- Modify "StepRun" table
ALTER TABLE "StepRun" DROP CONSTRAINT "StepRun_stepId_fkey", DROP CONSTRAINT "StepRun_tenantId_fkey", DROP CONSTRAINT "StepRun_tickerId_fkey";
-- Create index "StepRunExpressionEval_stepRunId_idx" to table: "StepRunExpressionEval"
CREATE INDEX "StepRunExpressionEval_stepRunId_idx" ON "StepRunExpressionEval" ("stepRunId");
-- Modify "StreamEvent" table
ALTER TABLE "StreamEvent" DROP CONSTRAINT "StreamEvent_tenantId_fkey";
-- Modify "Worker" table
ALTER TABLE "Worker" DROP CONSTRAINT "Worker_tenantId_fkey";
-- Modify "Workflow" table
ALTER TABLE "Workflow" DROP CONSTRAINT "Workflow_tenantId_fkey";
-- Modify "WorkflowRun" table
ALTER TABLE "WorkflowRun" DROP CONSTRAINT "WorkflowRun_tenantId_fkey", DROP CONSTRAINT "WorkflowRun_workflowVersionId_fkey";
-- Modify "WorkflowRunDedupe" table
ALTER TABLE "WorkflowRunDedupe" DROP CONSTRAINT "WorkflowRunDedupe_tenantId_fkey";
-- Modify "WorkflowRunTriggeredBy" table
ALTER TABLE "WorkflowRunTriggeredBy" DROP CONSTRAINT "WorkflowRunTriggeredBy_eventId_fkey", DROP CONSTRAINT "WorkflowRunTriggeredBy_parentId_fkey", DROP CONSTRAINT "WorkflowRunTriggeredBy_tenantId_fkey";
+2 -1
View File
@@ -1,4 +1,4 @@
h1:FBWU1es/2kg6DEkbbqCtMNYYZ/r3ccSojg3sWKe7ZGQ=
h1:lF5HY8eyRauexSPXlJPoY2e6Vu064xoJ2h5WzA2nkRI=
20240115180414_init.sql h1:Ef3ZyjAHkmJPdGF/dEWCahbwgcg6uGJKnDxW2JCRi2k=
20240122014727_v0_6_0.sql h1:o/LdlteAeFgoHJ3e/M4Xnghqt9826IE/Y/h0q95Acuo=
20240126235456_v0_7_0.sql h1:KiVzt/hXgQ6esbdC6OMJOOWuYEXmy1yeCpmsVAHTFKs=
@@ -59,3 +59,4 @@ h1:FBWU1es/2kg6DEkbbqCtMNYYZ/r3ccSojg3sWKe7ZGQ=
20240918162532_v0.45.0.sql h1:pOhBg/58SnD8qBpDx9OdxTFfjRxEanJrJF7rLEvUFSY=
20240923124809_v0.45.4.sql h1:SqSSiWebWGiDg978uLwiaWfcJkR5tm+8HQZOmCiOABQ=
20240926210650_v0.47.0.sql h1:Wm6QJ7GIFIsM9eym6e+PB3g/C9mM3tUoogxiBFD3E0w=
20240927172935_v0.47.1.sql h1:UlcoRDWx5xCoNjcYTjBsWkdbFZSuVJWlWZOzhTq8u/Y=
+3 -63
View File
@@ -1107,6 +1107,9 @@ CREATE INDEX "StepRunEvent_stepRunId_idx" ON "StepRunEvent"("stepRunId" ASC);
-- CreateIndex
CREATE INDEX "StepRunEvent_workflowRunId_idx" ON "StepRunEvent"("workflowRunId" ASC);
-- CreateIndex
CREATE INDEX "StepRunExpressionEval_stepRunId_idx" ON "StepRunExpressionEval"("stepRunId" ASC);
-- CreateIndex
CREATE UNIQUE INDEX "StepRunResultArchive_id_key" ON "StepRunResultArchive"("id" ASC);
@@ -1362,12 +1365,6 @@ ALTER TABLE "Action" ADD CONSTRAINT "Action_tenantId_fkey" FOREIGN KEY ("tenantI
-- AddForeignKey
ALTER TABLE "Event" ADD CONSTRAINT "Event_replayedFromId_fkey" FOREIGN KEY ("replayedFromId") REFERENCES "Event"("id") ON DELETE SET NULL ON UPDATE CASCADE;
-- AddForeignKey
ALTER TABLE "Event" ADD CONSTRAINT "Event_tenantId_fkey" FOREIGN KEY ("tenantId") REFERENCES "Tenant"("id") ON DELETE CASCADE ON UPDATE CASCADE;
-- AddForeignKey
ALTER TABLE "GetGroupKeyRun" ADD CONSTRAINT "GetGroupKeyRun_tenantId_fkey" FOREIGN KEY ("tenantId") REFERENCES "Tenant"("id") ON DELETE CASCADE ON UPDATE CASCADE;
-- AddForeignKey
ALTER TABLE "GetGroupKeyRun" ADD CONSTRAINT "GetGroupKeyRun_tickerId_fkey" FOREIGN KEY ("tickerId") REFERENCES "Ticker"("id") ON DELETE SET NULL ON UPDATE CASCADE;
@@ -1377,21 +1374,12 @@ ALTER TABLE "GetGroupKeyRun" ADD CONSTRAINT "GetGroupKeyRun_workerId_fkey" FOREI
-- AddForeignKey
ALTER TABLE "GetGroupKeyRun" ADD CONSTRAINT "GetGroupKeyRun_workflowRunId_fkey" FOREIGN KEY ("workflowRunId") REFERENCES "WorkflowRun"("id") ON DELETE CASCADE ON UPDATE CASCADE;
-- AddForeignKey
ALTER TABLE "Job" ADD CONSTRAINT "Job_tenantId_fkey" FOREIGN KEY ("tenantId") REFERENCES "Tenant"("id") ON DELETE CASCADE ON UPDATE CASCADE;
-- AddForeignKey
ALTER TABLE "Job" ADD CONSTRAINT "Job_workflowVersionId_fkey" FOREIGN KEY ("workflowVersionId") REFERENCES "WorkflowVersion"("id") ON DELETE CASCADE ON UPDATE CASCADE;
-- AddForeignKey
ALTER TABLE "JobRun" ADD CONSTRAINT "JobRun_jobId_fkey" FOREIGN KEY ("jobId") REFERENCES "Job"("id") ON DELETE CASCADE ON UPDATE CASCADE;
-- AddForeignKey
ALTER TABLE "JobRun" ADD CONSTRAINT "JobRun_tenantId_fkey" FOREIGN KEY ("tenantId") REFERENCES "Tenant"("id") ON DELETE CASCADE ON UPDATE CASCADE;
-- AddForeignKey
ALTER TABLE "JobRun" ADD CONSTRAINT "JobRun_tickerId_fkey" FOREIGN KEY ("tickerId") REFERENCES "Ticker"("id") ON DELETE SET NULL ON UPDATE CASCADE;
-- AddForeignKey
ALTER TABLE "JobRun" ADD CONSTRAINT "JobRun_workflowRunId_fkey" FOREIGN KEY ("workflowRunId") REFERENCES "WorkflowRun"("id") ON DELETE CASCADE ON UPDATE CASCADE;
@@ -1404,12 +1392,6 @@ ALTER TABLE "JobRunLookupData" ADD CONSTRAINT "JobRunLookupData_tenantId_fkey" F
-- AddForeignKey
ALTER TABLE "LogLine" ADD CONSTRAINT "LogLine_stepRunId_fkey" FOREIGN KEY ("stepRunId") REFERENCES "StepRun"("id") ON DELETE SET NULL ON UPDATE CASCADE;
-- AddForeignKey
ALTER TABLE "LogLine" ADD CONSTRAINT "LogLine_tenantId_fkey" FOREIGN KEY ("tenantId") REFERENCES "Tenant"("id") ON DELETE CASCADE ON UPDATE CASCADE;
-- AddForeignKey
ALTER TABLE "RateLimit" ADD CONSTRAINT "RateLimit_tenantId_fkey" FOREIGN KEY ("tenantId") REFERENCES "Tenant"("id") ON DELETE CASCADE ON UPDATE CASCADE;
-- AddForeignKey
ALTER TABLE "SNSIntegration" ADD CONSTRAINT "SNSIntegration_tenantId_fkey" FOREIGN KEY ("tenantId") REFERENCES "Tenant"("id") ON DELETE CASCADE ON UPDATE CASCADE;
@@ -1425,30 +1407,15 @@ ALTER TABLE "Step" ADD CONSTRAINT "Step_actionId_tenantId_fkey" FOREIGN KEY ("ac
-- AddForeignKey
ALTER TABLE "Step" ADD CONSTRAINT "Step_jobId_fkey" FOREIGN KEY ("jobId") REFERENCES "Job"("id") ON DELETE CASCADE ON UPDATE CASCADE;
-- AddForeignKey
ALTER TABLE "Step" ADD CONSTRAINT "Step_tenantId_fkey" FOREIGN KEY ("tenantId") REFERENCES "Tenant"("id") ON DELETE CASCADE ON UPDATE CASCADE;
-- AddForeignKey
ALTER TABLE "StepDesiredWorkerLabel" ADD CONSTRAINT "StepDesiredWorkerLabel_stepId_fkey" FOREIGN KEY ("stepId") REFERENCES "Step"("id") ON DELETE CASCADE ON UPDATE CASCADE;
-- AddForeignKey
ALTER TABLE "StepRateLimit" ADD CONSTRAINT "StepRateLimit_tenantId_fkey" FOREIGN KEY ("tenantId") REFERENCES "Tenant"("id") ON DELETE CASCADE ON UPDATE CASCADE;
-- AddForeignKey
ALTER TABLE "StepRateLimit" ADD CONSTRAINT "StepRateLimit_tenantId_rateLimitKey_fkey" FOREIGN KEY ("tenantId", "rateLimitKey") REFERENCES "RateLimit"("tenantId", "key") ON DELETE RESTRICT ON UPDATE CASCADE;
-- AddForeignKey
ALTER TABLE "StepRun" ADD CONSTRAINT "StepRun_jobRunId_fkey" FOREIGN KEY ("jobRunId") REFERENCES "JobRun"("id") ON DELETE CASCADE ON UPDATE CASCADE;
-- AddForeignKey
ALTER TABLE "StepRun" ADD CONSTRAINT "StepRun_stepId_fkey" FOREIGN KEY ("stepId") REFERENCES "Step"("id") ON DELETE CASCADE ON UPDATE CASCADE;
-- AddForeignKey
ALTER TABLE "StepRun" ADD CONSTRAINT "StepRun_tenantId_fkey" FOREIGN KEY ("tenantId") REFERENCES "Tenant"("id") ON DELETE CASCADE ON UPDATE CASCADE;
-- AddForeignKey
ALTER TABLE "StepRun" ADD CONSTRAINT "StepRun_tickerId_fkey" FOREIGN KEY ("tickerId") REFERENCES "Ticker"("id") ON DELETE SET NULL ON UPDATE CASCADE;
-- AddForeignKey
ALTER TABLE "StepRun" ADD CONSTRAINT "StepRun_workerId_fkey" FOREIGN KEY ("workerId") REFERENCES "Worker"("id") ON DELETE SET NULL ON UPDATE CASCADE;
@@ -1458,9 +1425,6 @@ ALTER TABLE "StepRunResultArchive" ADD CONSTRAINT "StepRunResultArchive_stepRunI
-- AddForeignKey
ALTER TABLE "StreamEvent" ADD CONSTRAINT "StreamEvent_stepRunId_fkey" FOREIGN KEY ("stepRunId") REFERENCES "StepRun"("id") ON DELETE SET NULL ON UPDATE CASCADE;
-- AddForeignKey
ALTER TABLE "StreamEvent" ADD CONSTRAINT "StreamEvent_tenantId_fkey" FOREIGN KEY ("tenantId") REFERENCES "Tenant"("id") ON DELETE CASCADE ON UPDATE CASCADE;
-- AddForeignKey
ALTER TABLE "Tenant" ADD CONSTRAINT "Tenant_controllerPartitionId_fkey" FOREIGN KEY ("controllerPartitionId") REFERENCES "ControllerPartition"("id") ON DELETE SET NULL ON UPDATE SET NULL;
@@ -1524,9 +1488,6 @@ ALTER TABLE "WebhookWorkerWorkflow" ADD CONSTRAINT "WebhookWorkerWorkflow_workfl
-- AddForeignKey
ALTER TABLE "Worker" ADD CONSTRAINT "Worker_dispatcherId_fkey" FOREIGN KEY ("dispatcherId") REFERENCES "Dispatcher"("id") ON DELETE SET NULL ON UPDATE CASCADE;
-- AddForeignKey
ALTER TABLE "Worker" ADD CONSTRAINT "Worker_tenantId_fkey" FOREIGN KEY ("tenantId") REFERENCES "Tenant"("id") ON DELETE CASCADE ON UPDATE CASCADE;
-- AddForeignKey
ALTER TABLE "Worker" ADD CONSTRAINT "Worker_webhookId_fkey" FOREIGN KEY ("webhookId") REFERENCES "WebhookWorker"("id") ON DELETE SET NULL ON UPDATE CASCADE;
@@ -1536,9 +1497,6 @@ ALTER TABLE "WorkerAssignEvent" ADD CONSTRAINT "WorkerAssignEvent_workerId_fkey"
-- AddForeignKey
ALTER TABLE "WorkerLabel" ADD CONSTRAINT "WorkerLabel_workerId_fkey" FOREIGN KEY ("workerId") REFERENCES "Worker"("id") ON DELETE CASCADE ON UPDATE CASCADE;
-- AddForeignKey
ALTER TABLE "Workflow" ADD CONSTRAINT "Workflow_tenantId_fkey" FOREIGN KEY ("tenantId") REFERENCES "Tenant"("id") ON DELETE CASCADE ON UPDATE CASCADE;
-- AddForeignKey
ALTER TABLE "WorkflowConcurrency" ADD CONSTRAINT "WorkflowConcurrency_getConcurrencyGroupId_fkey" FOREIGN KEY ("getConcurrencyGroupId") REFERENCES "Action"("id") ON DELETE SET NULL ON UPDATE CASCADE;
@@ -1551,33 +1509,15 @@ ALTER TABLE "WorkflowRun" ADD CONSTRAINT "WorkflowRun_parentId_fkey" FOREIGN KEY
-- AddForeignKey
ALTER TABLE "WorkflowRun" ADD CONSTRAINT "WorkflowRun_parentStepRunId_fkey" FOREIGN KEY ("parentStepRunId") REFERENCES "StepRun"("id") ON DELETE SET NULL ON UPDATE CASCADE;
-- AddForeignKey
ALTER TABLE "WorkflowRun" ADD CONSTRAINT "WorkflowRun_tenantId_fkey" FOREIGN KEY ("tenantId") REFERENCES "Tenant"("id") ON DELETE CASCADE ON UPDATE CASCADE;
-- AddForeignKey
ALTER TABLE "WorkflowRun" ADD CONSTRAINT "WorkflowRun_workflowVersionId_fkey" FOREIGN KEY ("workflowVersionId") REFERENCES "WorkflowVersion"("id") ON DELETE CASCADE ON UPDATE CASCADE;
-- AddForeignKey
ALTER TABLE "WorkflowRunDedupe" ADD CONSTRAINT "WorkflowRunDedupe_tenantId_fkey" FOREIGN KEY ("tenantId") REFERENCES "Tenant"("id") ON DELETE CASCADE ON UPDATE CASCADE;
-- AddForeignKey
ALTER TABLE "WorkflowRunStickyState" ADD CONSTRAINT "WorkflowRunStickyState_workflowRunId_fkey" FOREIGN KEY ("workflowRunId") REFERENCES "WorkflowRun"("id") ON DELETE CASCADE ON UPDATE CASCADE;
-- AddForeignKey
ALTER TABLE "WorkflowRunTriggeredBy" ADD CONSTRAINT "WorkflowRunTriggeredBy_cronParentId_cronSchedule_fkey" FOREIGN KEY ("cronParentId", "cronSchedule") REFERENCES "WorkflowTriggerCronRef"("parentId", "cron") ON DELETE SET NULL ON UPDATE CASCADE;
-- AddForeignKey
ALTER TABLE "WorkflowRunTriggeredBy" ADD CONSTRAINT "WorkflowRunTriggeredBy_eventId_fkey" FOREIGN KEY ("eventId") REFERENCES "Event"("id") ON DELETE SET NULL ON UPDATE CASCADE;
-- AddForeignKey
ALTER TABLE "WorkflowRunTriggeredBy" ADD CONSTRAINT "WorkflowRunTriggeredBy_parentId_fkey" FOREIGN KEY ("parentId") REFERENCES "WorkflowRun"("id") ON DELETE CASCADE ON UPDATE CASCADE;
-- AddForeignKey
ALTER TABLE "WorkflowRunTriggeredBy" ADD CONSTRAINT "WorkflowRunTriggeredBy_scheduledId_fkey" FOREIGN KEY ("scheduledId") REFERENCES "WorkflowTriggerScheduledRef"("id") ON DELETE SET NULL ON UPDATE CASCADE;
-- AddForeignKey
ALTER TABLE "WorkflowRunTriggeredBy" ADD CONSTRAINT "WorkflowRunTriggeredBy_tenantId_fkey" FOREIGN KEY ("tenantId") REFERENCES "Tenant"("id") ON DELETE CASCADE ON UPDATE CASCADE;
-- AddForeignKey
ALTER TABLE "WorkflowTag" ADD CONSTRAINT "WorkflowTag_tenantId_fkey" FOREIGN KEY ("tenantId") REFERENCES "Tenant"("id") ON DELETE CASCADE ON UPDATE CASCADE;