feat: tenant partitioning (#649)

* feat: tenant partitioning

* fix: rebalance inactive partitions, split into separate partitioner

* fix: shutdown partitioner scheduler properly

* update config options

* fix: config options linting
This commit is contained in:
abelanger5
2024-06-26 17:06:51 -04:00
committed by GitHub
parent 68176b725c
commit f2c6bc1f44
30 changed files with 11215 additions and 233 deletions

View File

@@ -57,7 +57,6 @@ tasks:
- sudo sh ./hack/dev/manage-hosts.sh add 127.0.0.1 app.dev.hatchet-tools.com
prisma-migrate:
cmds:
- go run github.com/steebchen/prisma-client-go migrate dev --create-only --skip-generate --name "{{.CLI_ARGS}}"
- task: generate-sqlc
- sh ./hack/dev/atlas-migrate.sh {{.CLI_ARGS}}
- DATABASE_URL='postgresql://hatchet:hatchet@127.0.0.1:5431/hatchet' sh ./hack/db/atlas-apply.sh
@@ -67,7 +66,7 @@ tasks:
- task: generate-sqlc
seed-dev:
cmds:
- sh ./hack/dev/run-go-with-env.sh run github.com/steebchen/prisma-client-go migrate deploy
- sh ./hack/dev/run-go-with-env.sh run github.com/steebchen/prisma-client-go migrate dev --skip-generate
- SEED_DEVELOPMENT=true sh ./hack/dev/run-go-with-env.sh run ./cmd/hatchet-admin seed
start-dev:
deps:
@@ -137,7 +136,7 @@ tasks:
- sh ./generate.sh
generate-sqlc:
cmds:
- DATABASE_URL='postgresql://hatchet:hatchet@127.0.0.1:5431/shadow' npx --yes prisma migrate deploy
- DATABASE_URL='postgresql://hatchet:hatchet@127.0.0.1:5431/shadow' npx --yes prisma migrate dev --skip-generate
- DATABASE_URL='postgresql://hatchet:hatchet@127.0.0.1:5431/shadow' npx --yes prisma migrate diff --from-empty --to-schema-datasource prisma/schema.prisma --script > sql/schema/schema.sql
- cp sql/schema/schema.sql pkg/repository/prisma/dbsqlc/schema.sql
- go run github.com/sqlc-dev/sqlc/cmd/sqlc@v1.24.0 generate --file pkg/repository/prisma/dbsqlc/sqlc.yaml

View File

@@ -11,6 +11,7 @@ import (
"github.com/hatchet-dev/hatchet/api/v1/server/oas/transformers"
"github.com/hatchet-dev/hatchet/pkg/repository"
"github.com/hatchet-dev/hatchet/pkg/repository/prisma/db"
"github.com/hatchet-dev/hatchet/pkg/repository/prisma/sqlchelpers"
)
func (t *TenantService) TenantCreate(ctx echo.Context, request gen.TenantCreateRequestObject) (gen.TenantCreateResponseObject, error) {
@@ -55,14 +56,16 @@ func (t *TenantService) TenantCreate(ctx echo.Context, request gen.TenantCreateR
return nil, err
}
err = t.config.EntitlementRepository.TenantLimit().SelectOrInsertTenantLimits(context.Background(), tenant.ID, nil)
tenantId := sqlchelpers.UUIDToStr(tenant.ID)
err = t.config.EntitlementRepository.TenantLimit().SelectOrInsertTenantLimits(context.Background(), tenantId, nil)
if err != nil {
return nil, err
}
// add the user as an owner of the tenant
_, err = t.config.APIRepository.Tenant().CreateTenantMember(tenant.ID, &repository.CreateTenantMemberOpts{
_, err = t.config.APIRepository.Tenant().CreateTenantMember(tenantId, &repository.CreateTenantMemberOpts{
UserId: user.ID,
Role: "OWNER",
})
@@ -71,7 +74,7 @@ func (t *TenantService) TenantCreate(ctx echo.Context, request gen.TenantCreateR
return nil, err
}
t.config.Analytics.Tenant(tenant.ID, map[string]interface{}{
t.config.Analytics.Tenant(tenantId, map[string]interface{}{
"name": tenant.Name,
"slug": tenant.Slug,
})
@@ -79,11 +82,11 @@ func (t *TenantService) TenantCreate(ctx echo.Context, request gen.TenantCreateR
t.config.Analytics.Enqueue(
"tenant:create",
user.ID,
&tenant.ID,
&tenantId,
nil,
)
return gen.TenantCreate200JSONResponse(
*transformers.ToTenant(tenant),
*transformers.ToTenantSqlc(tenant),
), nil
}

View File

@@ -19,6 +19,16 @@ func ToTenant(tenant *db.TenantModel) *gen.Tenant {
}
}
func ToTenantSqlc(tenant *dbsqlc.Tenant) *gen.Tenant {
return &gen.Tenant{
Metadata: *toAPIMetadata(sqlchelpers.UUIDToStr(tenant.ID), tenant.CreatedAt.Time, tenant.UpdatedAt.Time),
Name: tenant.Name,
Slug: tenant.Slug,
AnalyticsOptOut: &tenant.AnalyticsOptOut,
AlertMemberEmails: &tenant.AlertMemberEmails,
}
}
func ToTenantAlertingSettings(alerting *db.TenantAlertingSettingsModel) *gen.TenantAlertingSettings {
res := &gen.TenantAlertingSettings{
Metadata: *toAPIMetadata(alerting.ID, alerting.CreatedAt, alerting.UpdatedAt),

View File

@@ -86,7 +86,7 @@ func runSeed(cf *loader.ConfigLoader) error {
if errors.Is(err, db.ErrNotFound) {
// seed an example tenant
// initialize a tenant
tenant, err = dc.APIRepository.Tenant().CreateTenant(&repository.CreateTenantOpts{
sqlcTenant, err := dc.APIRepository.Tenant().CreateTenant(&repository.CreateTenantOpts{
ID: &dc.Seed.DefaultTenantID,
Name: dc.Seed.DefaultTenantName,
Slug: dc.Seed.DefaultTenantSlug,
@@ -96,6 +96,12 @@ func runSeed(cf *loader.ConfigLoader) error {
return err
}
tenant, err = dc.APIRepository.Tenant().GetTenantByID(sqlchelpers.UUIDToStr(sqlcTenant.ID))
if err != nil {
return err
}
fmt.Println("created tenant", tenant.ID)
// add the user to the tenant

View File

@@ -0,0 +1,135 @@
package engine
import (
"context"
"fmt"
"time"
"github.com/go-co-op/gocron/v2"
"github.com/google/uuid"
"github.com/hatchet-dev/hatchet/pkg/repository"
)
type partitioner struct {
s gocron.Scheduler
repo repository.TenantEngineRepository
}
func newPartitioner(repo repository.TenantEngineRepository) (partitioner, error) {
s, err := gocron.NewScheduler(gocron.WithLocation(time.UTC))
if err != nil {
return partitioner{}, err
}
return partitioner{s: s, repo: repo}, nil
}
func (p *partitioner) withControllers(ctx context.Context) (*Teardown, string, error) {
partitionId := uuid.New().String()
err := p.repo.CreateControllerPartition(ctx, partitionId)
if err != nil {
return nil, "", fmt.Errorf("could not create engine partition: %w", err)
}
// rebalance partitions on startup
err = p.repo.RebalanceAllControllerPartitions(ctx)
if err != nil {
return nil, "", fmt.Errorf("could not rebalance engine partitions: %w", err)
}
_, err = p.s.NewJob(
gocron.DurationJob(time.Minute*1),
gocron.NewTask(
func() {
rebalanceControllerPartitions(ctx, p.repo) // nolint: errcheck
},
),
)
if err != nil {
return nil, "", fmt.Errorf("could not create rebalance controller partitions job: %w", err)
}
return &Teardown{
Name: "partition teardown",
Fn: func() error {
deleteCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
err := p.repo.DeleteControllerPartition(deleteCtx, partitionId)
if err != nil {
return fmt.Errorf("could not delete controller partition: %w", err)
}
return p.repo.RebalanceAllControllerPartitions(deleteCtx)
},
}, partitionId, nil
}
func (p *partitioner) withTenantWorkers(ctx context.Context) (*Teardown, string, error) {
partitionId := uuid.New().String()
err := p.repo.CreateTenantWorkerPartition(ctx, partitionId)
if err != nil {
return nil, "", fmt.Errorf("could not create engine partition: %w", err)
}
// rebalance partitions on startup
err = p.repo.RebalanceAllTenantWorkerPartitions(ctx)
if err != nil {
return nil, "", fmt.Errorf("could not rebalance engine partitions: %w", err)
}
_, err = p.s.NewJob(
gocron.DurationJob(time.Minute*1),
gocron.NewTask(
func() {
rebalanceTenantWorkerPartitions(ctx, p.repo) // nolint: errcheck
},
),
)
if err != nil {
return nil, "", fmt.Errorf("could not create rebalance tenant worker partitions job: %w", err)
}
return &Teardown{
Name: "partition teardown",
Fn: func() error {
deleteCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
err := p.repo.DeleteTenantWorkerPartition(deleteCtx, partitionId)
if err != nil {
return fmt.Errorf("could not delete worker partition: %w", err)
}
return p.repo.RebalanceAllTenantWorkerPartitions(deleteCtx)
},
}, partitionId, nil
}
func (p *partitioner) start() {
p.s.Start()
}
func (p *partitioner) shutdown() error {
return p.s.Shutdown()
}
func rebalanceControllerPartitions(ctx context.Context, r repository.TenantEngineRepository) error {
return r.RebalanceInactiveControllerPartitions(ctx)
}
func rebalanceTenantWorkerPartitions(ctx context.Context, r repository.TenantEngineRepository) error {
return r.RebalanceInactiveTenantWorkerPartitions(ctx)
}

View File

@@ -104,8 +104,19 @@ func RunWithConfig(ctx context.Context, sc *server.ServerConfig) ([]Teardown, er
return nil, fmt.Errorf("could not initialize tracer: %w", err)
}
p, err := newPartitioner(sc.EngineRepository.Tenant())
if err != nil {
return nil, fmt.Errorf("could not create partitioner: %w", err)
}
teardown := []Teardown{}
teardown = append(teardown, Teardown{
Name: "partitioner",
Fn: p.shutdown,
})
var h *health.Health
healthProbes := sc.HasService("health")
if healthProbes {
@@ -165,47 +176,55 @@ func RunWithConfig(ctx context.Context, sc *server.ServerConfig) ([]Teardown, er
})
}
if sc.HasService("jobscontroller") {
if sc.HasService("queue") {
partitionTeardown, partitionId, err := p.withControllers(ctx)
if err != nil {
return nil, fmt.Errorf("could not create rebalance controller partitions job: %w", err)
}
teardown = append(teardown, *partitionTeardown)
jc, err := jobs.New(
jobs.WithAlerter(sc.Alerter),
jobs.WithMessageQueue(sc.MessageQueue),
jobs.WithRepository(sc.EngineRepository),
jobs.WithLogger(sc.Logger),
jobs.WithPartitionId(partitionId),
)
if err != nil {
return nil, fmt.Errorf("could not create jobs controller: %w", err)
}
cleanup, err := jc.Start()
cleanupJobs, err := jc.Start()
if err != nil {
return nil, fmt.Errorf("could not start jobs controller: %w", err)
}
teardown = append(teardown, Teardown{
Name: "jobs controller",
Fn: cleanup,
Fn: cleanupJobs,
})
}
if sc.HasService("workflowscontroller") {
wc, err := workflows.New(
workflows.WithAlerter(sc.Alerter),
workflows.WithMessageQueue(sc.MessageQueue),
workflows.WithRepository(sc.EngineRepository),
workflows.WithLogger(sc.Logger),
workflows.WithTenantAlerter(sc.TenantAlerter),
workflows.WithPartitionId(partitionId),
)
if err != nil {
return nil, fmt.Errorf("could not create workflows controller: %w", err)
}
cleanup, err := wc.Start()
cleanupWorkflows, err := wc.Start()
if err != nil {
return nil, fmt.Errorf("could not start workflows controller: %w", err)
}
teardown = append(teardown, Teardown{
Name: "workflows controller",
Fn: cleanup,
Fn: cleanupWorkflows,
})
}
@@ -338,7 +357,15 @@ func RunWithConfig(ctx context.Context, sc *server.ServerConfig) ([]Teardown, er
}
if sc.HasService("webhookscontroller") {
wh := webhooks.New(sc)
partitionTeardown, partitionId, err := p.withTenantWorkers(ctx)
if err != nil {
return nil, fmt.Errorf("could not create rebalance controller partitions job: %w", err)
}
teardown = append(teardown, *partitionTeardown)
wh := webhooks.New(sc, partitionId)
cleanup, err := wh.Start()
if err != nil {
@@ -364,6 +391,8 @@ func RunWithConfig(ctx context.Context, sc *server.ServerConfig) ([]Teardown, er
h.SetReady(true)
}
p.start()
<-ctx.Done()
if healthProbes {

View File

@@ -19,8 +19,8 @@ This document outlines the environment variables used to configure the server. T
## Services Configuration
| Variable | Description | Default Value |
| ----------------- | ------------------------ | -------------------------------------------------------------------------------------------------------------------------------- |
| `SERVER_SERVICES` | List of enabled services | `["health", "ticker", "grpc", "eventscontroller", "jobscontroller", "workflowscontroller", "webhookscontroller", "heartbeater"]` |
| ----------------- | ------------------------ | ------------------------------------------------------------------------------------------------ |
| `SERVER_SERVICES` | List of enabled services | `["health", "ticker", "grpc", "eventscontroller", "queue", "webhookscontroller", "heartbeater"]` |
## Encryption Configuration

View File

@@ -39,6 +39,7 @@ type JobsControllerImpl struct {
dv datautils.DataDecoderValidator
s gocron.Scheduler
a *hatcheterrors.Wrapped
partitionId string
}
type JobsControllerOpt func(*JobsControllerOpts)
@@ -49,6 +50,7 @@ type JobsControllerOpts struct {
repo repository.EngineRepository
dv datautils.DataDecoderValidator
alerter hatcheterrors.Alerter
partitionId string
}
func defaultJobsControllerOpts() *JobsControllerOpts {
@@ -86,6 +88,12 @@ func WithRepository(r repository.EngineRepository) JobsControllerOpt {
}
}
func WithPartitionId(pid string) JobsControllerOpt {
return func(opts *JobsControllerOpts) {
opts.partitionId = pid
}
}
func WithDataDecoderValidator(dv datautils.DataDecoderValidator) JobsControllerOpt {
return func(opts *JobsControllerOpts) {
opts.dv = dv
@@ -107,6 +115,10 @@ func New(fs ...JobsControllerOpt) (*JobsControllerImpl, error) {
return nil, fmt.Errorf("repository is required. use WithRepository")
}
if opts.partitionId == "" {
return nil, errors.New("partition id is required. use WithPartitionId")
}
newLogger := opts.l.With().Str("service", "jobs-controller").Logger()
opts.l = &newLogger
@@ -126,6 +138,7 @@ func New(fs ...JobsControllerOpt) (*JobsControllerImpl, error) {
dv: opts.dv,
s: s,
a: a,
partitionId: opts.partitionId,
}, nil
}
@@ -568,7 +581,7 @@ func (jc *JobsControllerImpl) runStepRunRequeue(ctx context.Context, startedAt t
jc.l.Debug().Msgf("jobs controller: checking step run requeue")
// list all tenants
tenants, err := jc.repo.Tenant().ListTenants(ctx)
tenants, err := jc.repo.Tenant().ListTenantsByControllerPartition(ctx, jc.partitionId)
if err != nil {
jc.l.Err(err).Msg("could not list tenants")
@@ -652,7 +665,7 @@ func (jc *JobsControllerImpl) runStepRunReassign(ctx context.Context, startedAt
jc.l.Debug().Msgf("jobs controller: checking step run reassignment")
// list all tenants
tenants, err := jc.repo.Tenant().ListTenants(ctx)
tenants, err := jc.repo.Tenant().ListTenantsByControllerPartition(ctx, jc.partitionId)
if err != nil {
jc.l.Err(err).Msg("could not list tenants")

View File

@@ -36,6 +36,7 @@ type WorkflowsControllerImpl struct {
s gocron.Scheduler
tenantAlerter *alerting.TenantAlertManager
a *hatcheterrors.Wrapped
partitionId string
}
type WorkflowsControllerOpt func(*WorkflowsControllerOpts)
@@ -47,6 +48,7 @@ type WorkflowsControllerOpts struct {
dv datautils.DataDecoderValidator
ta *alerting.TenantAlertManager
alerter hatcheterrors.Alerter
partitionId string
}
func defaultWorkflowsControllerOpts() *WorkflowsControllerOpts {
@@ -96,6 +98,12 @@ func WithTenantAlerter(ta *alerting.TenantAlertManager) WorkflowsControllerOpt {
}
}
func WithPartitionId(partitionId string) WorkflowsControllerOpt {
return func(opts *WorkflowsControllerOpts) {
opts.partitionId = partitionId
}
}
func New(fs ...WorkflowsControllerOpt) (*WorkflowsControllerImpl, error) {
opts := defaultWorkflowsControllerOpts()
@@ -115,6 +123,10 @@ func New(fs ...WorkflowsControllerOpt) (*WorkflowsControllerImpl, error) {
return nil, fmt.Errorf("tenant alerter is required. use WithTenantAlerter")
}
if opts.partitionId == "" {
return nil, fmt.Errorf("partition ID is required. use WithPartitionId")
}
s, err := gocron.NewScheduler(gocron.WithLocation(time.UTC))
if err != nil {
@@ -135,6 +147,7 @@ func New(fs ...WorkflowsControllerOpt) (*WorkflowsControllerImpl, error) {
s: s,
tenantAlerter: opts.ta,
a: a,
partitionId: opts.partitionId,
}, nil
}

View File

@@ -214,7 +214,7 @@ func (wc *WorkflowsControllerImpl) scheduleGetGroupAction(
getGroupKeyRunId := sqlchelpers.UUIDToStr(getGroupKeyRun.GetGroupKeyRun.ID)
workflowRunId := sqlchelpers.UUIDToStr(getGroupKeyRun.WorkflowRunId)
getGroupKeyRun, err := wc.repo.GetGroupKeyRun().UpdateGetGroupKeyRun(ctx, tenantId, getGroupKeyRunId, &repository.UpdateGetGroupKeyRunOpts{
_, err := wc.repo.GetGroupKeyRun().UpdateGetGroupKeyRun(ctx, tenantId, getGroupKeyRunId, &repository.UpdateGetGroupKeyRunOpts{
Status: repository.StepRunStatusPtr(db.StepRunStatusPendingAssignment),
})
@@ -335,7 +335,7 @@ func (wc *WorkflowsControllerImpl) runGetGroupKeyRunRequeue(ctx context.Context)
wc.l.Debug().Msgf("workflows controller: checking get group key run requeue")
// list all tenants
tenants, err := wc.repo.Tenant().ListTenants(ctx)
tenants, err := wc.repo.Tenant().ListTenantsByControllerPartition(ctx, wc.partitionId)
if err != nil {
wc.l.Err(err).Msg("could not list tenants")
@@ -423,7 +423,7 @@ func (wc *WorkflowsControllerImpl) runGetGroupKeyRunReassign(ctx context.Context
wc.l.Debug().Msgf("workflows controller: checking get group key run reassign")
// list all tenants
tenants, err := wc.repo.Tenant().ListTenants(ctx)
tenants, err := wc.repo.Tenant().ListTenantsByControllerPartition(ctx, wc.partitionId)
if err != nil {
wc.l.Err(err).Msg("could not list tenants")

View File

@@ -22,13 +22,15 @@ type WebhooksController struct {
sc *server.ServerConfig
registeredWorkerIds map[string]bool
cleanups map[string]func() error
partitionId string
}
func New(sc *server.ServerConfig) *WebhooksController {
func New(sc *server.ServerConfig, partitionId string) *WebhooksController {
return &WebhooksController{
sc: sc,
registeredWorkerIds: map[string]bool{},
cleanups: map[string]func() error{},
partitionId: partitionId,
}
}
@@ -64,20 +66,15 @@ func (c *WebhooksController) Start() (func() error, error) {
}
func (c *WebhooksController) check() error {
tenants, err := c.sc.EngineRepository.Tenant().ListTenants(context.Background())
if err != nil {
return fmt.Errorf("could not list tenants: %w", err)
}
wws, err := c.sc.EngineRepository.WebhookWorker().ListWebhookWorkersByPartitionId(context.Background(), c.partitionId)
for _, tenant := range tenants {
tenantId := sqlchelpers.UUIDToStr(tenant.ID)
wws, err := c.sc.EngineRepository.WebhookWorker().ListWebhookWorkers(context.Background(), tenantId)
if err != nil {
return fmt.Errorf("could not get webhook workers: %w", err)
}
for _, ww := range wws {
tenantId := sqlchelpers.UUIDToStr(ww.TenantId)
ww := ww
go func() {
id := sqlchelpers.UUIDToStr(ww.ID)
@@ -172,7 +169,6 @@ func (c *WebhooksController) check() error {
}
}()
}
}
return nil
}

View File

@@ -37,7 +37,7 @@ type ServerConfigFile struct {
MessageQueue MessageQueueConfigFile `mapstructure:"msgQueue" json:"msgQueue,omitempty"`
Services []string `mapstructure:"services" json:"services,omitempty" default:"[\"health\", \"ticker\", \"grpc\", \"eventscontroller\", \"jobscontroller\", \"workflowscontroller\", \"webhookscontroller\", \"heartbeater\"]"`
Services []string `mapstructure:"services" json:"services,omitempty" default:"[\"health\", \"ticker\", \"grpc\", \"eventscontroller\", \"queue\", \"webhookscontroller\", \"heartbeater\"]"`
TLS shared.TLSConfigFile `mapstructure:"tls" json:"tls,omitempty"`

File diff suppressed because it is too large Load Diff

View File

@@ -612,6 +612,13 @@ type ActionToWorker struct {
A pgtype.UUID `json:"A"`
}
type ControllerPartition struct {
ID string `json:"id"`
CreatedAt pgtype.Timestamp `json:"createdAt"`
UpdatedAt pgtype.Timestamp `json:"updatedAt"`
LastHeartbeat pgtype.Timestamp `json:"lastHeartbeat"`
}
type Dispatcher struct {
ID pgtype.UUID `json:"id"`
CreatedAt pgtype.Timestamp `json:"createdAt"`
@@ -867,6 +874,8 @@ type Tenant struct {
Slug string `json:"slug"`
AnalyticsOptOut bool `json:"analyticsOptOut"`
AlertMemberEmails bool `json:"alertMemberEmails"`
ControllerPartitionId pgtype.Text `json:"controllerPartitionId"`
WorkerPartitionId pgtype.Text `json:"workerPartitionId"`
}
type TenantAlertEmailGroup struct {
@@ -949,6 +958,13 @@ type TenantVcsProvider struct {
Config []byte `json:"config"`
}
type TenantWorkerPartition struct {
ID string `json:"id"`
CreatedAt pgtype.Timestamp `json:"createdAt"`
UpdatedAt pgtype.Timestamp `json:"updatedAt"`
LastHeartbeat pgtype.Timestamp `json:"lastHeartbeat"`
}
type Ticker struct {
ID pgtype.UUID `json:"id"`
CreatedAt pgtype.Timestamp `json:"createdAt"`

View File

@@ -61,6 +61,16 @@ CREATE TABLE "Action" (
CONSTRAINT "Action_pkey" PRIMARY KEY ("id")
);
-- CreateTable
CREATE TABLE "ControllerPartition" (
"id" TEXT NOT NULL,
"createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"updatedAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"lastHeartbeat" TIMESTAMP(3),
CONSTRAINT "ControllerPartition_pkey" PRIMARY KEY ("id")
);
-- CreateTable
CREATE TABLE "Dispatcher" (
"id" UUID NOT NULL,
@@ -350,6 +360,8 @@ CREATE TABLE "Tenant" (
"slug" TEXT NOT NULL,
"analyticsOptOut" BOOLEAN NOT NULL DEFAULT false,
"alertMemberEmails" BOOLEAN NOT NULL DEFAULT true,
"controllerPartitionId" TEXT,
"workerPartitionId" TEXT,
CONSTRAINT "Tenant_pkey" PRIMARY KEY ("id")
);
@@ -455,6 +467,16 @@ CREATE TABLE "TenantVcsProvider" (
CONSTRAINT "TenantVcsProvider_pkey" PRIMARY KEY ("id")
);
-- CreateTable
CREATE TABLE "TenantWorkerPartition" (
"id" TEXT NOT NULL,
"createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"updatedAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"lastHeartbeat" TIMESTAMP(3),
CONSTRAINT "TenantWorkerPartition_pkey" PRIMARY KEY ("id")
);
-- CreateTable
CREATE TABLE "Ticker" (
"id" UUID NOT NULL,
@@ -744,6 +766,9 @@ CREATE UNIQUE INDEX "Action_id_key" ON "Action"("id" ASC);
-- CreateIndex
CREATE UNIQUE INDEX "Action_tenantId_actionId_key" ON "Action"("tenantId" ASC, "actionId" ASC);
-- CreateIndex
CREATE UNIQUE INDEX "ControllerPartition_id_key" ON "ControllerPartition"("id" ASC);
-- CreateIndex
CREATE UNIQUE INDEX "Dispatcher_id_key" ON "Dispatcher"("id" ASC);
@@ -837,12 +862,18 @@ CREATE INDEX "StepRunEvent_stepRunId_idx" ON "StepRunEvent"("stepRunId" ASC);
-- CreateIndex
CREATE UNIQUE INDEX "StepRunResultArchive_id_key" ON "StepRunResultArchive"("id" ASC);
-- CreateIndex
CREATE INDEX "Tenant_controllerPartitionId_idx" ON "Tenant"("controllerPartitionId" ASC);
-- CreateIndex
CREATE UNIQUE INDEX "Tenant_id_key" ON "Tenant"("id" ASC);
-- CreateIndex
CREATE UNIQUE INDEX "Tenant_slug_key" ON "Tenant"("slug" ASC);
-- CreateIndex
CREATE INDEX "Tenant_workerPartitionId_idx" ON "Tenant"("workerPartitionId" ASC);
-- CreateIndex
CREATE UNIQUE INDEX "TenantAlertEmailGroup_id_key" ON "TenantAlertEmailGroup"("id" ASC);
@@ -876,6 +907,9 @@ CREATE UNIQUE INDEX "TenantVcsProvider_id_key" ON "TenantVcsProvider"("id" ASC);
-- CreateIndex
CREATE UNIQUE INDEX "TenantVcsProvider_tenantId_vcsProvider_key" ON "TenantVcsProvider"("tenantId" ASC, "vcsProvider" ASC);
-- CreateIndex
CREATE UNIQUE INDEX "TenantWorkerPartition_id_key" ON "TenantWorkerPartition"("id" ASC);
-- CreateIndex
CREATE UNIQUE INDEX "Ticker_id_key" ON "Ticker"("id" ASC);
@@ -1125,6 +1159,12 @@ ALTER TABLE "StreamEvent" ADD CONSTRAINT "StreamEvent_stepRunId_fkey" FOREIGN KE
-- AddForeignKey
ALTER TABLE "StreamEvent" ADD CONSTRAINT "StreamEvent_tenantId_fkey" FOREIGN KEY ("tenantId") REFERENCES "Tenant"("id") ON DELETE CASCADE ON UPDATE CASCADE;
-- AddForeignKey
ALTER TABLE "Tenant" ADD CONSTRAINT "Tenant_controllerPartitionId_fkey" FOREIGN KEY ("controllerPartitionId") REFERENCES "ControllerPartition"("id") ON DELETE SET NULL ON UPDATE SET NULL;
-- AddForeignKey
ALTER TABLE "Tenant" ADD CONSTRAINT "Tenant_workerPartitionId_fkey" FOREIGN KEY ("workerPartitionId") REFERENCES "TenantWorkerPartition"("id") ON DELETE SET NULL ON UPDATE SET NULL;
-- AddForeignKey
ALTER TABLE "TenantAlertEmailGroup" ADD CONSTRAINT "TenantAlertEmailGroup_tenantId_fkey" FOREIGN KEY ("tenantId") REFERENCES "Tenant"("id") ON DELETE CASCADE ON UPDATE CASCADE;

View File

@@ -1,9 +1,72 @@
-- name: CreateTenant :one
WITH active_controller_partitions AS (
SELECT
"id"
FROM
"ControllerPartition"
WHERE
"lastHeartbeat" > NOW() - INTERVAL '1 minute'
)
INSERT INTO "Tenant" ("id", "name", "slug", "controllerPartitionId")
VALUES (
sqlc.arg('id')::uuid,
sqlc.arg('name')::text,
sqlc.arg('slug')::text,
(
SELECT
"id"
FROM
active_controller_partitions
ORDER BY
random()
LIMIT 1
)
)
RETURNING *;
-- name: CreateTenantAlertingSettings :one
INSERT INTO "TenantAlertingSettings" ("id", "tenantId")
VALUES (gen_random_uuid(), sqlc.arg('tenantId')::uuid)
RETURNING *;
-- name: ListTenants :many
SELECT
*
FROM
"Tenant" as tenants;
-- name: ListTenantsByControllerPartitionId :many
WITH update_partition AS (
UPDATE
"ControllerPartition"
SET
"lastHeartbeat" = NOW()
WHERE
"id" = sqlc.arg('controllerPartitionId')::text
)
SELECT
*
FROM
"Tenant" as tenants
WHERE
"controllerPartitionId" = sqlc.arg('controllerPartitionId')::text;
-- name: ListTenantsByTenantWorkerPartitionId :many
WITH update_partition AS (
UPDATE
"TenantWorkerPartition"
SET
"lastHeartbeat" = NOW()
WHERE
"id" = sqlc.arg('workerPartitionId')::text
)
SELECT
*
FROM
"Tenant" as tenants
WHERE
"workerPartitionId" = sqlc.arg('workerPartitionId')::text;
-- name: GetTenantByID :one
SELECT
*
@@ -128,3 +191,159 @@ LEFT JOIN
"StepRun" as stepRun ON jobRun."id" = stepRun."jobRunId"
GROUP BY
runs."workflowId";
-- name: CreateControllerPartition :one
INSERT INTO "ControllerPartition" ("id", "createdAt", "lastHeartbeat")
VALUES (sqlc.arg('id')::text, NOW(), NOW())
ON CONFLICT DO NOTHING
RETURNING *;
-- name: DeleteControllerPartition :one
DELETE FROM "ControllerPartition"
WHERE "id" = sqlc.arg('id')::text
RETURNING *;
-- name: RebalanceAllControllerPartitions :exec
WITH active_partitions AS (
SELECT
"id"
FROM
"ControllerPartition"
WHERE
"lastHeartbeat" > NOW() - INTERVAL '1 minute'
)
UPDATE
"Tenant" as tenants
SET
"controllerPartitionId" = (
SELECT
"id"
FROM
active_partitions
ORDER BY
random()
LIMIT 1
)
WHERE
"slug" != 'internal' AND
(
"controllerPartitionId" IS NULL OR
"controllerPartitionId" NOT IN (SELECT "id" FROM active_partitions)
)
RETURNING *;
-- name: RebalanceInactiveControllerPartitions :exec
WITH active_partitions AS (
SELECT
"id"
FROM
"ControllerPartition"
WHERE
"lastHeartbeat" > NOW() - INTERVAL '1 minute'
), inactive_partitions AS (
SELECT
"id"
FROM
"ControllerPartition"
WHERE
"lastHeartbeat" <= NOW() - INTERVAL '1 minute'
), update_tenants AS (
UPDATE
"Tenant" as tenants
SET
"controllerPartitionId" = (
SELECT
"id"
FROM
active_partitions
ORDER BY
random()
LIMIT 1
)
WHERE
"slug" != 'internal' AND
(
"controllerPartitionId" IS NULL OR
"controllerPartitionId" IN (SELECT "id" FROM inactive_partitions)
)
)
DELETE FROM "ControllerPartition"
WHERE "id" IN (SELECT "id" FROM inactive_partitions);
-- name: CreateTenantWorkerPartition :one
INSERT INTO "TenantWorkerPartition" ("id", "createdAt", "lastHeartbeat")
VALUES (sqlc.arg('id')::text, NOW(), NOW())
ON CONFLICT DO NOTHING
RETURNING *;
-- name: DeleteTenantWorkerPartition :one
DELETE FROM "TenantWorkerPartition"
WHERE "id" = sqlc.arg('id')::text
RETURNING *;
-- name: RebalanceAllTenantWorkerPartitions :exec
WITH active_partitions AS (
SELECT
"id"
FROM
"TenantWorkerPartition"
WHERE
"lastHeartbeat" > NOW() - INTERVAL '1 minute'
)
UPDATE
"Tenant" as tenants
SET
"workerPartitionId" = (
SELECT
"id"
FROM
active_partitions
ORDER BY
random()
LIMIT 1
)
WHERE
"slug" != 'internal' AND
(
"workerPartitionId" IS NULL OR
"workerPartitionId" NOT IN (SELECT "id" FROM active_partitions)
)
RETURNING *;
-- name: RebalanceInactiveTenantWorkerPartitions :exec
WITH active_partitions AS (
SELECT
"id"
FROM
"TenantWorkerPartition"
WHERE
"lastHeartbeat" > NOW() - INTERVAL '1 minute'
), inactive_partitions AS (
SELECT
"id"
FROM
"TenantWorkerPartition"
WHERE
"lastHeartbeat" <= NOW() - INTERVAL '1 minute'
), update_tenants AS (
UPDATE
"Tenant" as tenants
SET
"workerPartitionId" = (
SELECT
"id"
FROM
active_partitions
ORDER BY
random()
LIMIT 1
)
WHERE
"slug" != 'internal' AND
(
"workerPartitionId" IS NULL OR
"workerPartitionId" IN (SELECT "id" FROM inactive_partitions)
)
)
DELETE FROM "TenantWorkerPartition"
WHERE "id" IN (SELECT "id" FROM inactive_partitions);

View File

@@ -11,6 +11,156 @@ import (
"github.com/jackc/pgx/v5/pgtype"
)
const createControllerPartition = `-- name: CreateControllerPartition :one
INSERT INTO "ControllerPartition" ("id", "createdAt", "lastHeartbeat")
VALUES ($1::text, NOW(), NOW())
ON CONFLICT DO NOTHING
RETURNING id, "createdAt", "updatedAt", "lastHeartbeat"
`
func (q *Queries) CreateControllerPartition(ctx context.Context, db DBTX, id string) (*ControllerPartition, error) {
row := db.QueryRow(ctx, createControllerPartition, id)
var i ControllerPartition
err := row.Scan(
&i.ID,
&i.CreatedAt,
&i.UpdatedAt,
&i.LastHeartbeat,
)
return &i, err
}
const createTenant = `-- name: CreateTenant :one
WITH active_controller_partitions AS (
SELECT
"id"
FROM
"ControllerPartition"
WHERE
"lastHeartbeat" > NOW() - INTERVAL '1 minute'
)
INSERT INTO "Tenant" ("id", "name", "slug", "controllerPartitionId")
VALUES (
$1::uuid,
$2::text,
$3::text,
(
SELECT
"id"
FROM
active_controller_partitions
ORDER BY
random()
LIMIT 1
)
)
RETURNING id, "createdAt", "updatedAt", "deletedAt", name, slug, "analyticsOptOut", "alertMemberEmails", "controllerPartitionId", "workerPartitionId"
`
type CreateTenantParams struct {
ID pgtype.UUID `json:"id"`
Name string `json:"name"`
Slug string `json:"slug"`
}
func (q *Queries) CreateTenant(ctx context.Context, db DBTX, arg CreateTenantParams) (*Tenant, error) {
row := db.QueryRow(ctx, createTenant, arg.ID, arg.Name, arg.Slug)
var i Tenant
err := row.Scan(
&i.ID,
&i.CreatedAt,
&i.UpdatedAt,
&i.DeletedAt,
&i.Name,
&i.Slug,
&i.AnalyticsOptOut,
&i.AlertMemberEmails,
&i.ControllerPartitionId,
&i.WorkerPartitionId,
)
return &i, err
}
const createTenantAlertingSettings = `-- name: CreateTenantAlertingSettings :one
INSERT INTO "TenantAlertingSettings" ("id", "tenantId")
VALUES (gen_random_uuid(), $1::uuid)
RETURNING id, "createdAt", "updatedAt", "deletedAt", "tenantId", "maxFrequency", "lastAlertedAt", "tickerId", "enableExpiringTokenAlerts", "enableWorkflowRunFailureAlerts", "enableTenantResourceLimitAlerts"
`
func (q *Queries) CreateTenantAlertingSettings(ctx context.Context, db DBTX, tenantid pgtype.UUID) (*TenantAlertingSettings, error) {
row := db.QueryRow(ctx, createTenantAlertingSettings, tenantid)
var i TenantAlertingSettings
err := row.Scan(
&i.ID,
&i.CreatedAt,
&i.UpdatedAt,
&i.DeletedAt,
&i.TenantId,
&i.MaxFrequency,
&i.LastAlertedAt,
&i.TickerId,
&i.EnableExpiringTokenAlerts,
&i.EnableWorkflowRunFailureAlerts,
&i.EnableTenantResourceLimitAlerts,
)
return &i, err
}
const createTenantWorkerPartition = `-- name: CreateTenantWorkerPartition :one
INSERT INTO "TenantWorkerPartition" ("id", "createdAt", "lastHeartbeat")
VALUES ($1::text, NOW(), NOW())
ON CONFLICT DO NOTHING
RETURNING id, "createdAt", "updatedAt", "lastHeartbeat"
`
func (q *Queries) CreateTenantWorkerPartition(ctx context.Context, db DBTX, id string) (*TenantWorkerPartition, error) {
row := db.QueryRow(ctx, createTenantWorkerPartition, id)
var i TenantWorkerPartition
err := row.Scan(
&i.ID,
&i.CreatedAt,
&i.UpdatedAt,
&i.LastHeartbeat,
)
return &i, err
}
const deleteControllerPartition = `-- name: DeleteControllerPartition :one
DELETE FROM "ControllerPartition"
WHERE "id" = $1::text
RETURNING id, "createdAt", "updatedAt", "lastHeartbeat"
`
func (q *Queries) DeleteControllerPartition(ctx context.Context, db DBTX, id string) (*ControllerPartition, error) {
row := db.QueryRow(ctx, deleteControllerPartition, id)
var i ControllerPartition
err := row.Scan(
&i.ID,
&i.CreatedAt,
&i.UpdatedAt,
&i.LastHeartbeat,
)
return &i, err
}
const deleteTenantWorkerPartition = `-- name: DeleteTenantWorkerPartition :one
DELETE FROM "TenantWorkerPartition"
WHERE "id" = $1::text
RETURNING id, "createdAt", "updatedAt", "lastHeartbeat"
`
func (q *Queries) DeleteTenantWorkerPartition(ctx context.Context, db DBTX, id string) (*TenantWorkerPartition, error) {
row := db.QueryRow(ctx, deleteTenantWorkerPartition, id)
var i TenantWorkerPartition
err := row.Scan(
&i.ID,
&i.CreatedAt,
&i.UpdatedAt,
&i.LastHeartbeat,
)
return &i, err
}
const getEmailGroups = `-- name: GetEmailGroups :many
SELECT
id, "createdAt", "updatedAt", "deletedAt", "tenantId", emails
@@ -145,7 +295,7 @@ func (q *Queries) GetTenantAlertingSettings(ctx context.Context, db DBTX, tenant
const getTenantByID = `-- name: GetTenantByID :one
SELECT
id, "createdAt", "updatedAt", "deletedAt", name, slug, "analyticsOptOut", "alertMemberEmails"
id, "createdAt", "updatedAt", "deletedAt", name, slug, "analyticsOptOut", "alertMemberEmails", "controllerPartitionId", "workerPartitionId"
FROM
"Tenant" as tenants
WHERE
@@ -164,6 +314,8 @@ func (q *Queries) GetTenantByID(ctx context.Context, db DBTX, id pgtype.UUID) (*
&i.Slug,
&i.AnalyticsOptOut,
&i.AlertMemberEmails,
&i.ControllerPartitionId,
&i.WorkerPartitionId,
)
return &i, err
}
@@ -306,7 +458,7 @@ func (q *Queries) GetTenantWorkflowQueueMetrics(ctx context.Context, db DBTX, ar
const listTenants = `-- name: ListTenants :many
SELECT
id, "createdAt", "updatedAt", "deletedAt", name, slug, "analyticsOptOut", "alertMemberEmails"
id, "createdAt", "updatedAt", "deletedAt", name, slug, "analyticsOptOut", "alertMemberEmails", "controllerPartitionId", "workerPartitionId"
FROM
"Tenant" as tenants
`
@@ -329,6 +481,8 @@ func (q *Queries) ListTenants(ctx context.Context, db DBTX) ([]*Tenant, error) {
&i.Slug,
&i.AnalyticsOptOut,
&i.AlertMemberEmails,
&i.ControllerPartitionId,
&i.WorkerPartitionId,
); err != nil {
return nil, err
}
@@ -340,6 +494,260 @@ func (q *Queries) ListTenants(ctx context.Context, db DBTX) ([]*Tenant, error) {
return items, nil
}
const listTenantsByControllerPartitionId = `-- name: ListTenantsByControllerPartitionId :many
WITH update_partition AS (
UPDATE
"ControllerPartition"
SET
"lastHeartbeat" = NOW()
WHERE
"id" = $1::text
)
SELECT
id, "createdAt", "updatedAt", "deletedAt", name, slug, "analyticsOptOut", "alertMemberEmails", "controllerPartitionId", "workerPartitionId"
FROM
"Tenant" as tenants
WHERE
"controllerPartitionId" = $1::text
`
func (q *Queries) ListTenantsByControllerPartitionId(ctx context.Context, db DBTX, controllerpartitionid string) ([]*Tenant, error) {
rows, err := db.Query(ctx, listTenantsByControllerPartitionId, controllerpartitionid)
if err != nil {
return nil, err
}
defer rows.Close()
var items []*Tenant
for rows.Next() {
var i Tenant
if err := rows.Scan(
&i.ID,
&i.CreatedAt,
&i.UpdatedAt,
&i.DeletedAt,
&i.Name,
&i.Slug,
&i.AnalyticsOptOut,
&i.AlertMemberEmails,
&i.ControllerPartitionId,
&i.WorkerPartitionId,
); err != nil {
return nil, err
}
items = append(items, &i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const listTenantsByTenantWorkerPartitionId = `-- name: ListTenantsByTenantWorkerPartitionId :many
WITH update_partition AS (
UPDATE
"TenantWorkerPartition"
SET
"lastHeartbeat" = NOW()
WHERE
"id" = $1::text
)
SELECT
id, "createdAt", "updatedAt", "deletedAt", name, slug, "analyticsOptOut", "alertMemberEmails", "controllerPartitionId", "workerPartitionId"
FROM
"Tenant" as tenants
WHERE
"workerPartitionId" = $1::text
`
func (q *Queries) ListTenantsByTenantWorkerPartitionId(ctx context.Context, db DBTX, workerpartitionid string) ([]*Tenant, error) {
rows, err := db.Query(ctx, listTenantsByTenantWorkerPartitionId, workerpartitionid)
if err != nil {
return nil, err
}
defer rows.Close()
var items []*Tenant
for rows.Next() {
var i Tenant
if err := rows.Scan(
&i.ID,
&i.CreatedAt,
&i.UpdatedAt,
&i.DeletedAt,
&i.Name,
&i.Slug,
&i.AnalyticsOptOut,
&i.AlertMemberEmails,
&i.ControllerPartitionId,
&i.WorkerPartitionId,
); err != nil {
return nil, err
}
items = append(items, &i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const rebalanceAllControllerPartitions = `-- name: RebalanceAllControllerPartitions :exec
WITH active_partitions AS (
SELECT
"id"
FROM
"ControllerPartition"
WHERE
"lastHeartbeat" > NOW() - INTERVAL '1 minute'
)
UPDATE
"Tenant" as tenants
SET
"controllerPartitionId" = (
SELECT
"id"
FROM
active_partitions
ORDER BY
random()
LIMIT 1
)
WHERE
"slug" != 'internal' AND
(
"controllerPartitionId" IS NULL OR
"controllerPartitionId" NOT IN (SELECT "id" FROM active_partitions)
)
RETURNING id, "createdAt", "updatedAt", "deletedAt", name, slug, "analyticsOptOut", "alertMemberEmails", "controllerPartitionId", "workerPartitionId"
`
func (q *Queries) RebalanceAllControllerPartitions(ctx context.Context, db DBTX) error {
_, err := db.Exec(ctx, rebalanceAllControllerPartitions)
return err
}
const rebalanceAllTenantWorkerPartitions = `-- name: RebalanceAllTenantWorkerPartitions :exec
WITH active_partitions AS (
SELECT
"id"
FROM
"TenantWorkerPartition"
WHERE
"lastHeartbeat" > NOW() - INTERVAL '1 minute'
)
UPDATE
"Tenant" as tenants
SET
"workerPartitionId" = (
SELECT
"id"
FROM
active_partitions
ORDER BY
random()
LIMIT 1
)
WHERE
"slug" != 'internal' AND
(
"workerPartitionId" IS NULL OR
"workerPartitionId" NOT IN (SELECT "id" FROM active_partitions)
)
RETURNING id, "createdAt", "updatedAt", "deletedAt", name, slug, "analyticsOptOut", "alertMemberEmails", "controllerPartitionId", "workerPartitionId"
`
func (q *Queries) RebalanceAllTenantWorkerPartitions(ctx context.Context, db DBTX) error {
_, err := db.Exec(ctx, rebalanceAllTenantWorkerPartitions)
return err
}
const rebalanceInactiveControllerPartitions = `-- name: RebalanceInactiveControllerPartitions :exec
WITH active_partitions AS (
SELECT
"id"
FROM
"ControllerPartition"
WHERE
"lastHeartbeat" > NOW() - INTERVAL '1 minute'
), inactive_partitions AS (
SELECT
"id"
FROM
"ControllerPartition"
WHERE
"lastHeartbeat" <= NOW() - INTERVAL '1 minute'
), update_tenants AS (
UPDATE
"Tenant" as tenants
SET
"controllerPartitionId" = (
SELECT
"id"
FROM
active_partitions
ORDER BY
random()
LIMIT 1
)
WHERE
"slug" != 'internal' AND
(
"controllerPartitionId" IS NULL OR
"controllerPartitionId" IN (SELECT "id" FROM inactive_partitions)
)
)
DELETE FROM "ControllerPartition"
WHERE "id" IN (SELECT "id" FROM inactive_partitions)
`
func (q *Queries) RebalanceInactiveControllerPartitions(ctx context.Context, db DBTX) error {
_, err := db.Exec(ctx, rebalanceInactiveControllerPartitions)
return err
}
const rebalanceInactiveTenantWorkerPartitions = `-- name: RebalanceInactiveTenantWorkerPartitions :exec
WITH active_partitions AS (
SELECT
"id"
FROM
"TenantWorkerPartition"
WHERE
"lastHeartbeat" > NOW() - INTERVAL '1 minute'
), inactive_partitions AS (
SELECT
"id"
FROM
"TenantWorkerPartition"
WHERE
"lastHeartbeat" <= NOW() - INTERVAL '1 minute'
), update_tenants AS (
UPDATE
"Tenant" as tenants
SET
"workerPartitionId" = (
SELECT
"id"
FROM
active_partitions
ORDER BY
random()
LIMIT 1
)
WHERE
"slug" != 'internal' AND
(
"workerPartitionId" IS NULL OR
"workerPartitionId" IN (SELECT "id" FROM inactive_partitions)
)
)
DELETE FROM "TenantWorkerPartition"
WHERE "id" IN (SELECT "id" FROM inactive_partitions)
`
func (q *Queries) RebalanceInactiveTenantWorkerPartitions(ctx context.Context, db DBTX) error {
_, err := db.Exec(ctx, rebalanceInactiveTenantWorkerPartitions)
return err
}
const updateTenantAlertingSettings = `-- name: UpdateTenantAlertingSettings :one
UPDATE
"TenantAlertingSettings" as tenantAlertingSettings

View File

@@ -1,7 +1,22 @@
-- name: ListWebhookWorkers :many
-- name: ListWebhookWorkersByPartitionId :many
WITH tenants AS (
SELECT
"id"
FROM
"Tenant"
WHERE
"workerPartitionId" = sqlc.arg('workerPartitionId')::text
), update_partition AS (
UPDATE
"TenantWorkerPartition"
SET
"lastHeartbeat" = NOW()
WHERE
"id" = sqlc.arg('workerPartitionId')::text
)
SELECT *
FROM "WebhookWorker"
WHERE "tenantId" = @tenantId::uuid;
WHERE "tenantId" IN (SELECT "id" FROM tenants);
-- name: ListActiveWebhookWorkers :many
SELECT *

View File

@@ -66,14 +66,29 @@ func (q *Queries) ListActiveWebhookWorkers(ctx context.Context, db DBTX, tenanti
return items, nil
}
const listWebhookWorkers = `-- name: ListWebhookWorkers :many
const listWebhookWorkersByPartitionId = `-- name: ListWebhookWorkersByPartitionId :many
WITH tenants AS (
SELECT
"id"
FROM
"Tenant"
WHERE
"workerPartitionId" = $1::text
), update_partition AS (
UPDATE
"TenantWorkerPartition"
SET
"lastHeartbeat" = NOW()
WHERE
"id" = $1::text
)
SELECT id, "createdAt", "updatedAt", name, secret, url, "tokenValue", deleted, "tokenId", "tenantId"
FROM "WebhookWorker"
WHERE "tenantId" = $1::uuid
WHERE "tenantId" IN (SELECT "id" FROM tenants)
`
func (q *Queries) ListWebhookWorkers(ctx context.Context, db DBTX, tenantid pgtype.UUID) ([]*WebhookWorker, error) {
rows, err := db.Query(ctx, listWebhookWorkers, tenantid)
func (q *Queries) ListWebhookWorkersByPartitionId(ctx context.Context, db DBTX, workerpartitionid string) ([]*WebhookWorker, error) {
rows, err := db.Query(ctx, listWebhookWorkersByPartitionId, workerpartitionid)
if err != nil {
return nil, err
}

View File

@@ -3,6 +3,7 @@ package prisma
import (
"context"
"encoding/json"
"fmt"
"github.com/google/uuid"
"github.com/jackc/pgx/v5/pgtype"
@@ -39,7 +40,7 @@ func NewTenantAPIRepository(pool *pgxpool.Pool, client *db.PrismaClient, v valid
}
}
func (r *tenantAPIRepository) CreateTenant(opts *repository.CreateTenantOpts) (*db.TenantModel, error) {
func (r *tenantAPIRepository) CreateTenant(opts *repository.CreateTenantOpts) (*dbsqlc.Tenant, error) {
if err := r.v.Validate(opts); err != nil {
return nil, err
}
@@ -50,28 +51,35 @@ func (r *tenantAPIRepository) CreateTenant(opts *repository.CreateTenantOpts) (*
tenantId = *opts.ID
}
createTenantTx := r.client.Tenant.CreateOne(
db.Tenant.Name.Set(opts.Name),
db.Tenant.Slug.Set(opts.Slug),
db.Tenant.ID.Set(tenantId),
).Tx()
createSettingsTx := r.client.TenantAlertingSettings.CreateOne(
db.TenantAlertingSettings.Tenant.Link(
db.Tenant.ID.Equals(tenantId),
),
).Tx()
err := r.client.Prisma.Transaction(
createTenantTx,
createSettingsTx,
).Exec(context.Background())
tx, err := r.pool.Begin(context.Background())
if err != nil {
return nil, err
}
return createTenantTx.Result(), nil
defer deferRollback(context.Background(), r.l, tx.Rollback)
createTenant, err := r.queries.CreateTenant(context.Background(), tx, dbsqlc.CreateTenantParams{
ID: sqlchelpers.UUIDFromStr(tenantId),
Slug: opts.Slug,
Name: opts.Name,
})
if err != nil {
return nil, err
}
_, err = r.queries.CreateTenantAlertingSettings(context.Background(), tx, sqlchelpers.UUIDFromStr(tenantId))
if err != nil {
return nil, err
}
if err := tx.Commit(context.Background()); err != nil {
return nil, err
}
return createTenant, nil
}
func (r *tenantAPIRepository) UpdateTenant(id string, opts *repository.UpdateTenantOpts) (*db.TenantModel, error) {
@@ -284,3 +292,55 @@ func (r *tenantEngineRepository) GetTenantByID(ctx context.Context, tenantId str
return r.queries.GetTenantByID(ctx, r.pool, sqlchelpers.UUIDFromStr(tenantId))
})
}
func (r *tenantEngineRepository) ListTenantsByControllerPartition(ctx context.Context, controllerPartitionId string) ([]*dbsqlc.Tenant, error) {
if controllerPartitionId == "" {
return nil, fmt.Errorf("partitionId is required")
}
return r.queries.ListTenantsByControllerPartitionId(ctx, r.pool, controllerPartitionId)
}
func (r *tenantEngineRepository) ListTenantsByWorkerPartition(ctx context.Context, workerPartitionId string) ([]*dbsqlc.Tenant, error) {
if workerPartitionId == "" {
return nil, fmt.Errorf("partitionId is required")
}
return r.queries.ListTenantsByTenantWorkerPartitionId(ctx, r.pool, workerPartitionId)
}
func (r *tenantEngineRepository) CreateControllerPartition(ctx context.Context, id string) error {
_, err := r.queries.CreateControllerPartition(ctx, r.pool, id)
return err
}
func (r *tenantEngineRepository) DeleteControllerPartition(ctx context.Context, id string) error {
_, err := r.queries.DeleteControllerPartition(ctx, r.pool, id)
return err
}
func (r *tenantEngineRepository) RebalanceAllControllerPartitions(ctx context.Context) error {
return r.queries.RebalanceAllControllerPartitions(ctx, r.pool)
}
func (r *tenantEngineRepository) RebalanceInactiveControllerPartitions(ctx context.Context) error {
return r.queries.RebalanceInactiveControllerPartitions(ctx, r.pool)
}
func (r *tenantEngineRepository) CreateTenantWorkerPartition(ctx context.Context, id string) error {
_, err := r.queries.CreateTenantWorkerPartition(ctx, r.pool, id)
return err
}
func (r *tenantEngineRepository) DeleteTenantWorkerPartition(ctx context.Context, id string) error {
_, err := r.queries.DeleteTenantWorkerPartition(ctx, r.pool, id)
return err
}
func (r *tenantEngineRepository) RebalanceAllTenantWorkerPartitions(ctx context.Context) error {
return r.queries.RebalanceAllTenantWorkerPartitions(ctx, r.pool)
}
func (r *tenantEngineRepository) RebalanceInactiveTenantWorkerPartitions(ctx context.Context) error {
return r.queries.RebalanceInactiveTenantWorkerPartitions(ctx, r.pool)
}

View File

@@ -2,6 +2,7 @@ package prisma
import (
"context"
"fmt"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/rs/zerolog"
@@ -30,8 +31,12 @@ func NewWebhookWorkerEngineRepository(pool *pgxpool.Pool, v validator.Validator,
}
}
func (r *webhookWorkerEngineRepository) ListWebhookWorkers(ctx context.Context, tenantId string) ([]*dbsqlc.WebhookWorker, error) {
return r.queries.ListWebhookWorkers(ctx, r.pool, sqlchelpers.UUIDFromStr(tenantId))
func (r *webhookWorkerEngineRepository) ListWebhookWorkersByPartitionId(ctx context.Context, partitionId string) ([]*dbsqlc.WebhookWorker, error) {
if partitionId == "" {
return nil, fmt.Errorf("partitionId is required")
}
return r.queries.ListWebhookWorkersByPartitionId(ctx, r.pool, partitionId)
}
func (r *webhookWorkerEngineRepository) ListActiveWebhookWorkers(ctx context.Context, tenantId string) ([]*dbsqlc.WebhookWorker, error) {

View File

@@ -62,7 +62,7 @@ type GetQueueMetricsResponse struct {
type TenantAPIRepository interface {
// CreateTenant creates a new tenant.
CreateTenant(opts *CreateTenantOpts) (*db.TenantModel, error)
CreateTenant(opts *CreateTenantOpts) (*dbsqlc.Tenant, error)
// CreateTenant creates a new tenant.
UpdateTenant(tenantId string, opts *UpdateTenantOpts) (*db.TenantModel, error)
@@ -102,6 +102,28 @@ type TenantEngineRepository interface {
// ListTenants lists all tenants in the instance
ListTenants(ctx context.Context) ([]*dbsqlc.Tenant, error)
// ListTenantsByPartition lists all tenants in the given partition
ListTenantsByControllerPartition(ctx context.Context, controllerPartitionId string) ([]*dbsqlc.Tenant, error)
ListTenantsByWorkerPartition(ctx context.Context, workerPartitionId string) ([]*dbsqlc.Tenant, error)
// CreateEnginePartition creates a new partition for tenants within the engine
CreateControllerPartition(ctx context.Context, id string) error
DeleteControllerPartition(ctx context.Context, id string) error
RebalanceAllControllerPartitions(ctx context.Context) error
RebalanceInactiveControllerPartitions(ctx context.Context) error
CreateTenantWorkerPartition(ctx context.Context, id string) error
DeleteTenantWorkerPartition(ctx context.Context, id string) error
RebalanceAllTenantWorkerPartitions(ctx context.Context) error
RebalanceInactiveTenantWorkerPartitions(ctx context.Context) error
// GetTenantByID returns the tenant with the given id
GetTenantByID(ctx context.Context, tenantId string) (*dbsqlc.Tenant, error)
}

View File

@@ -17,8 +17,8 @@ type UpsertWebhookWorkerOpts struct {
}
type WebhookWorkerEngineRepository interface {
// ListWebhookWorkers returns the list of webhook workers for the given tenant
ListWebhookWorkers(ctx context.Context, tenantId string) ([]*dbsqlc.WebhookWorker, error)
// ListWebhookWorkersByPartitionId returns the list of webhook workers for a worker partition
ListWebhookWorkersByPartitionId(ctx context.Context, partitionId string) ([]*dbsqlc.WebhookWorker, error)
// ListActiveWebhookWorkers returns the list of active webhook workers for the given tenant
ListActiveWebhookWorkers(ctx context.Context, tenantId string) ([]*dbsqlc.WebhookWorker, error)

View File

@@ -1,9 +0,0 @@
-- CreateTable
CREATE TABLE "SecurityCheckIdent" (
"id" UUID NOT NULL,
CONSTRAINT "SecurityCheckIdent_pkey" PRIMARY KEY ("id")
);
-- CreateIndex
CREATE UNIQUE INDEX "SecurityCheckIdent_id_key" ON "SecurityCheckIdent"("id");

View File

@@ -0,0 +1,51 @@
-- AlterTable
ALTER TABLE "Tenant" ADD COLUMN "controllerPartitionId" TEXT,
ADD COLUMN "workerPartitionId" TEXT;
-- CreateTable
CREATE TABLE "ControllerPartition" (
"id" TEXT NOT NULL,
"createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"updatedAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"lastHeartbeat" TIMESTAMP(3),
CONSTRAINT "ControllerPartition_pkey" PRIMARY KEY ("id")
);
-- CreateTable
CREATE TABLE "TenantWorkerPartition" (
"id" TEXT NOT NULL,
"createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"updatedAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"lastHeartbeat" TIMESTAMP(3),
CONSTRAINT "TenantWorkerPartition_pkey" PRIMARY KEY ("id")
);
-- CreateTable
CREATE TABLE "SecurityCheckIdent" (
"id" UUID NOT NULL,
CONSTRAINT "SecurityCheckIdent_pkey" PRIMARY KEY ("id")
);
-- CreateIndex
CREATE UNIQUE INDEX "ControllerPartition_id_key" ON "ControllerPartition"("id");
-- CreateIndex
CREATE UNIQUE INDEX "TenantWorkerPartition_id_key" ON "TenantWorkerPartition"("id");
-- CreateIndex
CREATE UNIQUE INDEX "SecurityCheckIdent_id_key" ON "SecurityCheckIdent"("id");
-- CreateIndex
CREATE INDEX "Tenant_controllerPartitionId_idx" ON "Tenant"("controllerPartitionId");
-- CreateIndex
CREATE INDEX "Tenant_workerPartitionId_idx" ON "Tenant"("workerPartitionId");
-- AddForeignKey
ALTER TABLE "Tenant" ADD CONSTRAINT "Tenant_controllerPartitionId_fkey" FOREIGN KEY ("controllerPartitionId") REFERENCES "ControllerPartition"("id") ON DELETE SET NULL ON UPDATE SET NULL;
-- AddForeignKey
ALTER TABLE "Tenant" ADD CONSTRAINT "Tenant_workerPartitionId_fkey" FOREIGN KEY ("workerPartitionId") REFERENCES "TenantWorkerPartition"("id") ON DELETE SET NULL ON UPDATE SET NULL;

View File

@@ -124,6 +124,30 @@ model WebhookWorkerWorkflow {
@@unique([webhookWorkerId, workflowId])
}
// ControllerPartition represents an engine instance that only handles a subset of tenants. This is used for list
// operations across tenants. If tenants do not have a partition, they are included in all partitions.
model ControllerPartition {
id String @id @unique
createdAt DateTime @default(now())
updatedAt DateTime @default(now()) @updatedAt
// lastHeartbeat is used for rebalancing partitions
lastHeartbeat DateTime?
tenants Tenant[]
}
model TenantWorkerPartition {
id String @id @unique
createdAt DateTime @default(now())
updatedAt DateTime @default(now()) @updatedAt
// lastHeartbeat is used for rebalancing partitions
lastHeartbeat DateTime?
tenants Tenant[]
}
// Tenant represents a unique tenant in the database. Each tenant-scoped resource should have the tenant as
// an identifier, which makes tenant isolation easier.
model Tenant {
@@ -139,6 +163,14 @@ model Tenant {
// wheather the user has opted out of analytics
analyticsOptOut Boolean @default(false)
// the parent controller partition, if exists
controllerPartition ControllerPartition? @relation(fields: [controllerPartitionId], references: [id], onDelete: SetNull, onUpdate: SetNull)
controllerPartitionId String?
// the parent worker partition, if exists
workerPartition TenantWorkerPartition? @relation(fields: [workerPartitionId], references: [id], onDelete: SetNull, onUpdate: SetNull)
workerPartitionId String?
events Event[]
workflows Workflow[]
jobs Job[]
@@ -171,6 +203,9 @@ model Tenant {
limits TenantResourceLimit[]
limitAlerts TenantResourceLimitAlert[]
webhookWorkers WebhookWorker[]
@@index([controllerPartitionId])
@@index([workerPartitionId])
}
enum LimitResource {

View File

@@ -1,6 +0,0 @@
-- Create "SecurityCheckIdent" table
CREATE TABLE "SecurityCheckIdent" ("id" uuid NOT NULL, PRIMARY KEY ("id"));
-- Create index "SecurityCheckIdent_id_key" to table: "SecurityCheckIdent"
CREATE UNIQUE INDEX "SecurityCheckIdent_id_key" ON "SecurityCheckIdent" ("id");
-- Insert Default Ident
INSERT INTO "SecurityCheckIdent" ("id") VALUES (gen_random_uuid());

View File

@@ -0,0 +1,20 @@
-- Create "ControllerPartition" table
CREATE TABLE "ControllerPartition" ("id" text NOT NULL, "createdAt" timestamp(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, "updatedAt" timestamp(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, "lastHeartbeat" timestamp(3) NULL, PRIMARY KEY ("id"));
-- Create index "ControllerPartition_id_key" to table: "ControllerPartition"
CREATE UNIQUE INDEX "ControllerPartition_id_key" ON "ControllerPartition" ("id");
-- Create "SecurityCheckIdent" table
CREATE TABLE "SecurityCheckIdent" ("id" uuid NOT NULL, PRIMARY KEY ("id"));
-- Create index "SecurityCheckIdent_id_key" to table: "SecurityCheckIdent"
CREATE UNIQUE INDEX "SecurityCheckIdent_id_key" ON "SecurityCheckIdent" ("id");
-- Create "TenantWorkerPartition" table
CREATE TABLE "TenantWorkerPartition" ("id" text NOT NULL, "createdAt" timestamp(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, "updatedAt" timestamp(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, "lastHeartbeat" timestamp(3) NULL, PRIMARY KEY ("id"));
-- Create index "TenantWorkerPartition_id_key" to table: "TenantWorkerPartition"
CREATE UNIQUE INDEX "TenantWorkerPartition_id_key" ON "TenantWorkerPartition" ("id");
-- Modify "Tenant" table
ALTER TABLE "Tenant" ADD COLUMN "controllerPartitionId" text NULL, ADD COLUMN "workerPartitionId" text NULL, ADD CONSTRAINT "Tenant_controllerPartitionId_fkey" FOREIGN KEY ("controllerPartitionId") REFERENCES "ControllerPartition" ("id") ON UPDATE SET NULL ON DELETE SET NULL, ADD CONSTRAINT "Tenant_workerPartitionId_fkey" FOREIGN KEY ("workerPartitionId") REFERENCES "TenantWorkerPartition" ("id") ON UPDATE SET NULL ON DELETE SET NULL;
-- Create index "Tenant_controllerPartitionId_idx" to table: "Tenant"
CREATE INDEX "Tenant_controllerPartitionId_idx" ON "Tenant" ("controllerPartitionId");
-- Create index "Tenant_workerPartitionId_idx" to table: "Tenant"
CREATE INDEX "Tenant_workerPartitionId_idx" ON "Tenant" ("workerPartitionId");
INSERT INTO "SecurityCheckIdent" ("id") VALUES (gen_random_uuid());

View File

@@ -1,4 +1,4 @@
h1:SgNqZT++7yKEJ3fqsLNmn+nm/JhKU9+HDAInq0XAtI4=
h1:NK+ekkECiuadzun2bA8DNGD9zGTMM8+f44rPVLThpJs=
20240115180414_init.sql h1:Ef3ZyjAHkmJPdGF/dEWCahbwgcg6uGJKnDxW2JCRi2k=
20240122014727_v0_6_0.sql h1:o/LdlteAeFgoHJ3e/M4Xnghqt9826IE/Y/h0q95Acuo=
20240126235456_v0_7_0.sql h1:KiVzt/hXgQ6esbdC6OMJOOWuYEXmy1yeCpmsVAHTFKs=
@@ -33,4 +33,4 @@ h1:SgNqZT++7yKEJ3fqsLNmn+nm/JhKU9+HDAInq0XAtI4=
20240531200418_v0_30_1.sql h1:jPAKmGkP0Ecq1mUk9o2qr5S0fEV46oXicdlGh1TmBQg=
20240606145243_v0_31_0.sql h1:ALisDQv8IPGe6MiBSfE/Esdl5x4pzNHIVMavlsBXIPE=
20240625180548_v0.34.0.sql h1:77uSk0VF/jBvEPHCqWC4hmMQqUx4zVnMdTryGsIXt9s=
20240626195645_v0_35_0.sql h1:iBWeeBHZpNkUGzfg1z6k7Jy1RvuXiUPhH09Nmp6bZtQ=
20240626204339_v0.34.2.sql h1:e2hArnEfcEYcBjEPxZW3axkl4CGt2lHa1oIA2r2fjfY=

View File

@@ -61,6 +61,16 @@ CREATE TABLE "Action" (
CONSTRAINT "Action_pkey" PRIMARY KEY ("id")
);
-- CreateTable
CREATE TABLE "ControllerPartition" (
"id" TEXT NOT NULL,
"createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"updatedAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"lastHeartbeat" TIMESTAMP(3),
CONSTRAINT "ControllerPartition_pkey" PRIMARY KEY ("id")
);
-- CreateTable
CREATE TABLE "Dispatcher" (
"id" UUID NOT NULL,
@@ -350,6 +360,8 @@ CREATE TABLE "Tenant" (
"slug" TEXT NOT NULL,
"analyticsOptOut" BOOLEAN NOT NULL DEFAULT false,
"alertMemberEmails" BOOLEAN NOT NULL DEFAULT true,
"controllerPartitionId" TEXT,
"workerPartitionId" TEXT,
CONSTRAINT "Tenant_pkey" PRIMARY KEY ("id")
);
@@ -455,6 +467,16 @@ CREATE TABLE "TenantVcsProvider" (
CONSTRAINT "TenantVcsProvider_pkey" PRIMARY KEY ("id")
);
-- CreateTable
CREATE TABLE "TenantWorkerPartition" (
"id" TEXT NOT NULL,
"createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"updatedAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"lastHeartbeat" TIMESTAMP(3),
CONSTRAINT "TenantWorkerPartition_pkey" PRIMARY KEY ("id")
);
-- CreateTable
CREATE TABLE "Ticker" (
"id" UUID NOT NULL,
@@ -744,6 +766,9 @@ CREATE UNIQUE INDEX "Action_id_key" ON "Action"("id" ASC);
-- CreateIndex
CREATE UNIQUE INDEX "Action_tenantId_actionId_key" ON "Action"("tenantId" ASC, "actionId" ASC);
-- CreateIndex
CREATE UNIQUE INDEX "ControllerPartition_id_key" ON "ControllerPartition"("id" ASC);
-- CreateIndex
CREATE UNIQUE INDEX "Dispatcher_id_key" ON "Dispatcher"("id" ASC);
@@ -837,12 +862,18 @@ CREATE INDEX "StepRunEvent_stepRunId_idx" ON "StepRunEvent"("stepRunId" ASC);
-- CreateIndex
CREATE UNIQUE INDEX "StepRunResultArchive_id_key" ON "StepRunResultArchive"("id" ASC);
-- CreateIndex
CREATE INDEX "Tenant_controllerPartitionId_idx" ON "Tenant"("controllerPartitionId" ASC);
-- CreateIndex
CREATE UNIQUE INDEX "Tenant_id_key" ON "Tenant"("id" ASC);
-- CreateIndex
CREATE UNIQUE INDEX "Tenant_slug_key" ON "Tenant"("slug" ASC);
-- CreateIndex
CREATE INDEX "Tenant_workerPartitionId_idx" ON "Tenant"("workerPartitionId" ASC);
-- CreateIndex
CREATE UNIQUE INDEX "TenantAlertEmailGroup_id_key" ON "TenantAlertEmailGroup"("id" ASC);
@@ -876,6 +907,9 @@ CREATE UNIQUE INDEX "TenantVcsProvider_id_key" ON "TenantVcsProvider"("id" ASC);
-- CreateIndex
CREATE UNIQUE INDEX "TenantVcsProvider_tenantId_vcsProvider_key" ON "TenantVcsProvider"("tenantId" ASC, "vcsProvider" ASC);
-- CreateIndex
CREATE UNIQUE INDEX "TenantWorkerPartition_id_key" ON "TenantWorkerPartition"("id" ASC);
-- CreateIndex
CREATE UNIQUE INDEX "Ticker_id_key" ON "Ticker"("id" ASC);
@@ -1125,6 +1159,12 @@ ALTER TABLE "StreamEvent" ADD CONSTRAINT "StreamEvent_stepRunId_fkey" FOREIGN KE
-- AddForeignKey
ALTER TABLE "StreamEvent" ADD CONSTRAINT "StreamEvent_tenantId_fkey" FOREIGN KEY ("tenantId") REFERENCES "Tenant"("id") ON DELETE CASCADE ON UPDATE CASCADE;
-- AddForeignKey
ALTER TABLE "Tenant" ADD CONSTRAINT "Tenant_controllerPartitionId_fkey" FOREIGN KEY ("controllerPartitionId") REFERENCES "ControllerPartition"("id") ON DELETE SET NULL ON UPDATE SET NULL;
-- AddForeignKey
ALTER TABLE "Tenant" ADD CONSTRAINT "Tenant_workerPartitionId_fkey" FOREIGN KEY ("workerPartitionId") REFERENCES "TenantWorkerPartition"("id") ON DELETE SET NULL ON UPDATE SET NULL;
-- AddForeignKey
ALTER TABLE "TenantAlertEmailGroup" ADD CONSTRAINT "TenantAlertEmailGroup_tenantId_fkey" FOREIGN KEY ("tenantId") REFERENCES "Tenant"("id") ON DELETE CASCADE ON UPDATE CASCADE;