From e1fdeeaf1c2b4b975fc3af09ee4f3acad0d1a10b Mon Sep 17 00:00:00 2001 From: abelanger5 Date: Thu, 23 Oct 2025 17:45:49 -0400 Subject: [PATCH] fix: payload performance (#2441) * change some olap flush settings * increase timeouts for payload wal * fix: improve performance of payload wal metrics * slight updates * more small tweaks * undo some olap changes, don't offload some payloads * remove double reads * try reducing wal poll limit * analyze v1_dag * move partition method --- .../migrations/20251022151234_v1_0_52.sql | 111 ++++++++++++++++++ .../controllers/v1/olap/controller.go | 39 +++--- pkg/config/server/server.go | 2 +- pkg/repository/v1/olap.go | 42 +++---- pkg/repository/v1/payloadstore.go | 15 ++- pkg/repository/v1/sqlcv1/olap.sql | 6 +- pkg/repository/v1/sqlcv1/olap.sql.go | 6 +- pkg/repository/v1/sqlcv1/payload-store.sql | 32 +++-- pkg/repository/v1/sqlcv1/payload-store.sql.go | 56 +++++++-- pkg/repository/v1/sqlcv1/tasks.sql | 3 + pkg/repository/v1/sqlcv1/tasks.sql.go | 9 ++ pkg/repository/v1/task.go | 6 + sql/schema/v1-core.sql | 22 +++- 13 files changed, 268 insertions(+), 81 deletions(-) create mode 100644 cmd/hatchet-migrate/migrate/migrations/20251022151234_v1_0_52.sql diff --git a/cmd/hatchet-migrate/migrate/migrations/20251022151234_v1_0_52.sql b/cmd/hatchet-migrate/migrate/migrations/20251022151234_v1_0_52.sql new file mode 100644 index 000000000..ca88bf88f --- /dev/null +++ b/cmd/hatchet-migrate/migrate/migrations/20251022151234_v1_0_52.sql @@ -0,0 +1,111 @@ +-- +goose Up +-- +goose StatementBegin +CREATE OR REPLACE FUNCTION find_matching_tenants_in_payload_wal_partition( + partition_number INT +) RETURNS UUID[] +LANGUAGE plpgsql AS +$$ +DECLARE + partition_table text; + result UUID[]; +BEGIN + partition_table := 'v1_payload_wal_' || partition_number::text; + + EXECUTE format( + 'SELECT ARRAY( + SELECT t.id + FROM "Tenant" t + WHERE EXISTS ( + SELECT 1 + FROM %I e + WHERE e.tenant_id = t.id + LIMIT 1 + ) + )', + partition_table) + INTO result; + + RETURN result; +END; +$$; + +CREATE OR REPLACE FUNCTION find_matching_tenants_in_payload_cutover_queue_item_partition( + partition_number INT +) RETURNS UUID[] +LANGUAGE plpgsql AS +$$ +DECLARE + partition_table text; + result UUID[]; +BEGIN + partition_table := 'v1_payload_cutover_queue_item_' || partition_number::text; + + EXECUTE format( + 'SELECT ARRAY( + SELECT t.id + FROM "Tenant" t + WHERE EXISTS ( + SELECT 1 + FROM %I e + WHERE e.tenant_id = t.id + AND e.cut_over_at <= NOW() + LIMIT 1 + ) + )', + partition_table) + INTO result; + + RETURN result; +END; +$$; +-- +goose StatementEnd + +-- +goose Down +-- +goose StatementBegin +CREATE OR REPLACE FUNCTION find_matching_tenants_in_payload_wal_partition( + partition_number INT +) RETURNS UUID[] +LANGUAGE plpgsql AS +$$ +DECLARE + partition_table text; + result UUID[]; +BEGIN + partition_table := 'v1_payload_wal_' || partition_number::text; + + EXECUTE format( + 'SELECT ARRAY( + SELECT DISTINCT e.tenant_id + FROM %I e + )', + partition_table) + INTO result; + + RETURN result; +END; +$$; + +CREATE OR REPLACE FUNCTION find_matching_tenants_in_payload_cutover_queue_item_partition( + partition_number INT +) RETURNS UUID[] +LANGUAGE plpgsql AS +$$ +DECLARE + partition_table text; + result UUID[]; +BEGIN + partition_table := 'v1_payload_cutover_queue_item_' || partition_number::text; + + EXECUTE format( + 'SELECT ARRAY( + SELECT DISTINCT e.tenant_id + FROM %I e + WHERE e.cut_over_at <= NOW() + )', + partition_table) + INTO result; + + RETURN result; +END; +$$; +-- +goose StatementEnd diff --git a/internal/services/controllers/v1/olap/controller.go b/internal/services/controllers/v1/olap/controller.go index 44ee55898..cad5f54f9 100644 --- a/internal/services/controllers/v1/olap/controller.go +++ b/internal/services/controllers/v1/olap/controller.go @@ -227,6 +227,7 @@ func (o *OLAPControllerImpl) Start() (func() error, error) { o.s.Start() mqBuffer := msgqueue.NewMQSubBuffer(msgqueue.OLAP_QUEUE, heavyReadMQ, o.handleBufferedMsgs) + wg := sync.WaitGroup{} startupPartitionCtx, cancelStartupPartition := context.WithTimeout(context.Background(), 30*time.Second) @@ -715,31 +716,31 @@ func (tc *OLAPControllerImpl) handleCreateMonitoringEvent(ctx context.Context, t return nil } - retrieveOptsToKey, err := tc.repo.OLAP().PayloadStore().ExternalStore().Store(ctx, offloadToExternalOpts...) + // retrieveOptsToKey, err := tc.repo.OLAP().PayloadStore().ExternalStore().Store(ctx, offloadToExternalOpts...) - if err != nil { - return err - } + // if err != nil { + // return err + // } - offloadOpts := make([]v1.OffloadPayloadOpts, 0) + // offloadOpts := make([]v1.OffloadPayloadOpts, 0) - for opt, key := range retrieveOptsToKey { - externalId := idInsertedAtToExternalId[v1.IdInsertedAt{ - ID: opt.Id, - InsertedAt: opt.InsertedAt, - }] + // for opt, key := range retrieveOptsToKey { + // externalId := idInsertedAtToExternalId[v1.IdInsertedAt{ + // ID: opt.Id, + // InsertedAt: opt.InsertedAt, + // }] - offloadOpts = append(offloadOpts, v1.OffloadPayloadOpts{ - ExternalId: externalId, - ExternalLocationKey: string(key), - }) - } + // offloadOpts = append(offloadOpts, v1.OffloadPayloadOpts{ + // ExternalId: externalId, + // ExternalLocationKey: string(key), + // }) + // } - err = tc.repo.OLAP().OffloadPayloads(ctx, tenantId, offloadOpts) + // err = tc.repo.OLAP().OffloadPayloads(ctx, tenantId, offloadOpts) - if err != nil { - return err - } + // if err != nil { + // return err + // } return nil } diff --git a/pkg/config/server/server.go b/pkg/config/server/server.go index ed907d609..bc0a0a225 100644 --- a/pkg/config/server/server.go +++ b/pkg/config/server/server.go @@ -623,7 +623,7 @@ type PayloadStoreConfig struct { EnableTaskEventPayloadDualWrites bool `mapstructure:"enableTaskEventPayloadDualWrites" json:"enableTaskEventPayloadDualWrites,omitempty" default:"true"` EnableDagDataPayloadDualWrites bool `mapstructure:"enableDagDataPayloadDualWrites" json:"enableDagDataPayloadDualWrites,omitempty" default:"true"` EnableOLAPPayloadDualWrites bool `mapstructure:"enableOLAPPayloadDualWrites" json:"enableOLAPPayloadDualWrites,omitempty" default:"true"` - WALPollLimit int `mapstructure:"walPollLimit" json:"walPollLimit,omitempty" default:"1000"` + WALPollLimit int `mapstructure:"walPollLimit" json:"walPollLimit,omitempty" default:"100"` WALProcessInterval time.Duration `mapstructure:"walProcessInterval" json:"walProcessInterval,omitempty" default:"15s"` ExternalCutoverProcessInterval time.Duration `mapstructure:"externalCutoverProcessInterval" json:"externalCutoverProcessInterval,omitempty" default:"15s"` } diff --git a/pkg/repository/v1/olap.go b/pkg/repository/v1/olap.go index 7646554fe..870d2d61a 100644 --- a/pkg/repository/v1/olap.go +++ b/pkg/repository/v1/olap.go @@ -2084,33 +2084,33 @@ func (r *OLAPRepositoryImpl) BulkCreateEventsAndTriggers(ctx context.Context, ev return nil } - retrieveOptsToKey, err := r.PayloadStore().ExternalStore().Store(ctx, offloadToExternalOpts...) + // retrieveOptsToKey, err := r.PayloadStore().ExternalStore().Store(ctx, offloadToExternalOpts...) - if err != nil { - return err - } + // if err != nil { + // return err + // } - tenantIdToffloadOpts := make(map[string][]OffloadPayloadOpts) + // tenantIdToffloadOpts := make(map[string][]OffloadPayloadOpts) - for opt, key := range retrieveOptsToKey { - externalId := idInsertedAtToExternalId[IdInsertedAt{ - ID: opt.Id, - InsertedAt: opt.InsertedAt, - }] + // for opt, key := range retrieveOptsToKey { + // externalId := idInsertedAtToExternalId[IdInsertedAt{ + // ID: opt.Id, + // InsertedAt: opt.InsertedAt, + // }] - tenantIdToffloadOpts[opt.TenantId.String()] = append(tenantIdToffloadOpts[opt.TenantId.String()], OffloadPayloadOpts{ - ExternalId: externalId, - ExternalLocationKey: string(key), - }) - } + // tenantIdToffloadOpts[opt.TenantId.String()] = append(tenantIdToffloadOpts[opt.TenantId.String()], OffloadPayloadOpts{ + // ExternalId: externalId, + // ExternalLocationKey: string(key), + // }) + // } - for tenantId, opts := range tenantIdToffloadOpts { - err = r.OffloadPayloads(ctx, tenantId, opts) + // for tenantId, opts := range tenantIdToffloadOpts { + // err = r.OffloadPayloads(ctx, tenantId, opts) - if err != nil { - return fmt.Errorf("error offloading payloads: %v", err) - } - } + // if err != nil { + // return fmt.Errorf("error offloading payloads: %v", err) + // } + // } if len(offloadToExternalOpts) == 0 { return nil diff --git a/pkg/repository/v1/payloadstore.go b/pkg/repository/v1/payloadstore.go index 68bddf79e..e1b2addf2 100644 --- a/pkg/repository/v1/payloadstore.go +++ b/pkg/repository/v1/payloadstore.go @@ -310,7 +310,7 @@ func (p *payloadStoreRepositoryImpl) ProcessPayloadWAL(ctx context.Context, part ctx, span := telemetry.NewSpan(ctx, "payloadstore.process_payload_wal") defer span.End() - tx, commit, rollback, err := sqlchelpers.PrepareTx(ctx, p.pool, p.l, 5000) + tx, commit, rollback, err := sqlchelpers.PrepareTx(ctx, p.pool, p.l, 30000) if err != nil { return false, fmt.Errorf("failed to prepare transaction: %w", err) @@ -345,6 +345,7 @@ func (p *payloadStoreRepositoryImpl) ProcessPayloadWAL(ctx context.Context, part retrieveOpts := make([]RetrievePayloadOpts, len(walRecords)) retrieveOptsToOffloadAt := make(map[RetrievePayloadOpts]pgtype.Timestamptz) + retrieveOptsToPayload := make(map[RetrievePayloadOpts][]byte) for i, record := range walRecords { opts := RetrievePayloadOpts{ @@ -356,12 +357,10 @@ func (p *payloadStoreRepositoryImpl) ProcessPayloadWAL(ctx context.Context, part retrieveOpts[i] = opts retrieveOptsToOffloadAt[opts] = record.OffloadAt - } - payloads, err := p.retrieve(ctx, tx, retrieveOpts...) - - if err != nil { - return false, err + if record.Location == sqlcv1.V1PayloadLocationINLINE { + retrieveOptsToPayload[opts] = record.InlineContent + } } externalStoreOpts := make([]OffloadToExternalStoreOpts, 0) @@ -397,7 +396,7 @@ func (p *payloadStoreRepositoryImpl) ProcessPayloadWAL(ctx context.Context, part Id: opts.Id, InsertedAt: opts.InsertedAt, Type: opts.Type, - Payload: payloads[opts], + Payload: retrieveOptsToPayload[opts], TenantId: opts.TenantId.String(), }, OffloadAt: offloadAt.Time, @@ -513,7 +512,7 @@ func (p *payloadStoreRepositoryImpl) ProcessPayloadExternalCutovers(ctx context. ctx, span := telemetry.NewSpan(ctx, "payloadstore.process_payload_external_cutovers") defer span.End() - tx, commit, rollback, err := sqlchelpers.PrepareTx(ctx, p.pool, p.l, 5000) + tx, commit, rollback, err := sqlchelpers.PrepareTx(ctx, p.pool, p.l, 30000) if err != nil { return false, fmt.Errorf("failed to prepare transaction: %w", err) diff --git a/pkg/repository/v1/sqlcv1/olap.sql b/pkg/repository/v1/sqlcv1/olap.sql index d56a89235..955fea787 100644 --- a/pkg/repository/v1/sqlcv1/olap.sql +++ b/pkg/repository/v1/sqlcv1/olap.sql @@ -4,7 +4,8 @@ SELECT create_v1_hash_partitions('v1_task_status_updates_tmp'::text, @partitions::int), create_v1_olap_partition_with_date_and_status('v1_tasks_olap'::text, @date::date), create_v1_olap_partition_with_date_and_status('v1_runs_olap'::text, @date::date), - create_v1_olap_partition_with_date_and_status('v1_dags_olap'::text, @date::date) + create_v1_olap_partition_with_date_and_status('v1_dags_olap'::text, @date::date), + create_v1_range_partition('v1_payloads_olap'::text, @date::date) ; -- name: CreateOLAPEventPartitions :exec @@ -13,8 +14,7 @@ SELECT create_v1_range_partition('v1_event_to_run_olap'::text, @date::date), create_v1_weekly_range_partition('v1_event_lookup_table_olap'::text, @date::date), create_v1_range_partition('v1_incoming_webhook_validation_failures_olap'::text, @date::date), - create_v1_range_partition('v1_cel_evaluation_failures_olap'::text, @date::date), - create_v1_range_partition('v1_payloads_olap'::text, @date::date) + create_v1_range_partition('v1_cel_evaluation_failures_olap'::text, @date::date) ; -- name: AnalyzeV1RunsOLAP :exec diff --git a/pkg/repository/v1/sqlcv1/olap.sql.go b/pkg/repository/v1/sqlcv1/olap.sql.go index 063ff3618..4f9300c49 100644 --- a/pkg/repository/v1/sqlcv1/olap.sql.go +++ b/pkg/repository/v1/sqlcv1/olap.sql.go @@ -200,8 +200,7 @@ SELECT create_v1_range_partition('v1_event_to_run_olap'::text, $1::date), create_v1_weekly_range_partition('v1_event_lookup_table_olap'::text, $1::date), create_v1_range_partition('v1_incoming_webhook_validation_failures_olap'::text, $1::date), - create_v1_range_partition('v1_cel_evaluation_failures_olap'::text, $1::date), - create_v1_range_partition('v1_payloads_olap'::text, $1::date) + create_v1_range_partition('v1_cel_evaluation_failures_olap'::text, $1::date) ` func (q *Queries) CreateOLAPEventPartitions(ctx context.Context, db DBTX, date pgtype.Date) error { @@ -215,7 +214,8 @@ SELECT create_v1_hash_partitions('v1_task_status_updates_tmp'::text, $1::int), create_v1_olap_partition_with_date_and_status('v1_tasks_olap'::text, $2::date), create_v1_olap_partition_with_date_and_status('v1_runs_olap'::text, $2::date), - create_v1_olap_partition_with_date_and_status('v1_dags_olap'::text, $2::date) + create_v1_olap_partition_with_date_and_status('v1_dags_olap'::text, $2::date), + create_v1_range_partition('v1_payloads_olap'::text, $2::date) ` type CreateOLAPPartitionsParams struct { diff --git a/pkg/repository/v1/sqlcv1/payload-store.sql b/pkg/repository/v1/sqlcv1/payload-store.sql index d11781ecd..b7ba81fde 100644 --- a/pkg/repository/v1/sqlcv1/payload-store.sql +++ b/pkg/repository/v1/sqlcv1/payload-store.sql @@ -93,15 +93,31 @@ WITH tenants AS ( @partitionNumber::INT ) ) AS tenant_id +), wal_records AS ( + SELECT * + FROM v1_payload_wal + WHERE tenant_id = ANY(SELECT tenant_id FROM tenants) + ORDER BY offload_at + LIMIT @pollLimit::INT + FOR UPDATE SKIP LOCKED +), wal_records_without_payload AS ( + SELECT * + FROM wal_records wr + WHERE NOT EXISTS ( + SELECT 1 + FROM v1_payload p + WHERE (p.tenant_id, p.inserted_at, p.id, p.type) = (wr.tenant_id, wr.payload_inserted_at, wr.payload_id, wr.payload_type) + ) +), deleted_wal_records AS ( + DELETE FROM v1_payload_wal + WHERE (offload_at, payload_id, payload_inserted_at, payload_type, tenant_id) IN ( + SELECT offload_at, payload_id, payload_inserted_at, payload_type, tenant_id + FROM wal_records_without_payload + ) ) - -SELECT * -FROM v1_payload_wal -WHERE tenant_id = ANY(SELECT tenant_id FROM tenants) -ORDER BY offload_at -LIMIT @pollLimit::INT -FOR UPDATE SKIP LOCKED -; +SELECT wr.*, p.location, p.inline_content +FROM wal_records wr +JOIN v1_payload p ON (p.tenant_id, p.inserted_at, p.id, p.type) = (wr.tenant_id, wr.payload_inserted_at, wr.payload_id, wr.payload_type); -- name: SetPayloadExternalKeys :many WITH inputs AS ( diff --git a/pkg/repository/v1/sqlcv1/payload-store.sql.go b/pkg/repository/v1/sqlcv1/payload-store.sql.go index 30497c320..a44f8bf44 100644 --- a/pkg/repository/v1/sqlcv1/payload-store.sql.go +++ b/pkg/repository/v1/sqlcv1/payload-store.sql.go @@ -78,33 +78,61 @@ const pollPayloadWALForRecordsToReplicate = `-- name: PollPayloadWALForRecordsTo WITH tenants AS ( SELECT UNNEST( find_matching_tenants_in_payload_wal_partition( - $2::INT + $1::INT ) ) AS tenant_id +), wal_records AS ( + SELECT tenant_id, offload_at, payload_id, payload_inserted_at, payload_type, operation + FROM v1_payload_wal + WHERE tenant_id = ANY(SELECT tenant_id FROM tenants) + ORDER BY offload_at + LIMIT $2::INT + FOR UPDATE SKIP LOCKED +), wal_records_without_payload AS ( + SELECT tenant_id, offload_at, payload_id, payload_inserted_at, payload_type, operation + FROM wal_records wr + WHERE NOT EXISTS ( + SELECT 1 + FROM v1_payload p + WHERE (p.tenant_id, p.inserted_at, p.id, p.type) = (wr.tenant_id, wr.payload_inserted_at, wr.payload_id, wr.payload_type) + ) +), deleted_wal_records AS ( + DELETE FROM v1_payload_wal + WHERE (offload_at, payload_id, payload_inserted_at, payload_type, tenant_id) IN ( + SELECT offload_at, payload_id, payload_inserted_at, payload_type, tenant_id + FROM wal_records_without_payload + ) ) - -SELECT tenant_id, offload_at, payload_id, payload_inserted_at, payload_type, operation -FROM v1_payload_wal -WHERE tenant_id = ANY(SELECT tenant_id FROM tenants) -ORDER BY offload_at -LIMIT $1::INT -FOR UPDATE SKIP LOCKED +SELECT wr.tenant_id, wr.offload_at, wr.payload_id, wr.payload_inserted_at, wr.payload_type, wr.operation, p.location, p.inline_content +FROM wal_records wr +JOIN v1_payload p ON (p.tenant_id, p.inserted_at, p.id, p.type) = (wr.tenant_id, wr.payload_inserted_at, wr.payload_id, wr.payload_type) ` type PollPayloadWALForRecordsToReplicateParams struct { - Polllimit int32 `json:"polllimit"` Partitionnumber int32 `json:"partitionnumber"` + Polllimit int32 `json:"polllimit"` } -func (q *Queries) PollPayloadWALForRecordsToReplicate(ctx context.Context, db DBTX, arg PollPayloadWALForRecordsToReplicateParams) ([]*V1PayloadWal, error) { - rows, err := db.Query(ctx, pollPayloadWALForRecordsToReplicate, arg.Polllimit, arg.Partitionnumber) +type PollPayloadWALForRecordsToReplicateRow struct { + TenantID pgtype.UUID `json:"tenant_id"` + OffloadAt pgtype.Timestamptz `json:"offload_at"` + PayloadID int64 `json:"payload_id"` + PayloadInsertedAt pgtype.Timestamptz `json:"payload_inserted_at"` + PayloadType V1PayloadType `json:"payload_type"` + Operation V1PayloadWalOperation `json:"operation"` + Location V1PayloadLocation `json:"location"` + InlineContent []byte `json:"inline_content"` +} + +func (q *Queries) PollPayloadWALForRecordsToReplicate(ctx context.Context, db DBTX, arg PollPayloadWALForRecordsToReplicateParams) ([]*PollPayloadWALForRecordsToReplicateRow, error) { + rows, err := db.Query(ctx, pollPayloadWALForRecordsToReplicate, arg.Partitionnumber, arg.Polllimit) if err != nil { return nil, err } defer rows.Close() - var items []*V1PayloadWal + var items []*PollPayloadWALForRecordsToReplicateRow for rows.Next() { - var i V1PayloadWal + var i PollPayloadWALForRecordsToReplicateRow if err := rows.Scan( &i.TenantID, &i.OffloadAt, @@ -112,6 +140,8 @@ func (q *Queries) PollPayloadWALForRecordsToReplicate(ctx context.Context, db DB &i.PayloadInsertedAt, &i.PayloadType, &i.Operation, + &i.Location, + &i.InlineContent, ); err != nil { return nil, err } diff --git a/pkg/repository/v1/sqlcv1/tasks.sql b/pkg/repository/v1/sqlcv1/tasks.sql index b8cbec329..d8517631b 100644 --- a/pkg/repository/v1/sqlcv1/tasks.sql +++ b/pkg/repository/v1/sqlcv1/tasks.sql @@ -919,6 +919,9 @@ ANALYZE v1_task; -- name: AnalyzeV1TaskEvent :exec ANALYZE v1_task_event; +-- name: AnalyzeV1Dag :exec +ANALYZE v1_dag; + -- name: CleanupV1TaskRuntime :execresult WITH locked_trs AS ( SELECT vtr.task_id, vtr.task_inserted_at, vtr.retry_count diff --git a/pkg/repository/v1/sqlcv1/tasks.sql.go b/pkg/repository/v1/sqlcv1/tasks.sql.go index 59c8e04be..b30627aef 100644 --- a/pkg/repository/v1/sqlcv1/tasks.sql.go +++ b/pkg/repository/v1/sqlcv1/tasks.sql.go @@ -12,6 +12,15 @@ import ( "github.com/jackc/pgx/v5/pgtype" ) +const analyzeV1Dag = `-- name: AnalyzeV1Dag :exec +ANALYZE v1_dag +` + +func (q *Queries) AnalyzeV1Dag(ctx context.Context, db DBTX) error { + _, err := db.Exec(ctx, analyzeV1Dag) + return err +} + const analyzeV1Task = `-- name: AnalyzeV1Task :exec ANALYZE v1_task ` diff --git a/pkg/repository/v1/task.go b/pkg/repository/v1/task.go index 9e9f99053..2d0bae921 100644 --- a/pkg/repository/v1/task.go +++ b/pkg/repository/v1/task.go @@ -3621,6 +3621,12 @@ func (r *TaskRepositoryImpl) AnalyzeTaskTables(ctx context.Context) error { return fmt.Errorf("error analyzing v1_task_event: %v", err) } + err = r.queries.AnalyzeV1Dag(ctx, tx) + + if err != nil { + return fmt.Errorf("error analyzing v1_dag: %v", err) + } + err = r.queries.AnalyzeV1Payload(ctx, tx) if err != nil { diff --git a/sql/schema/v1-core.sql b/sql/schema/v1-core.sql index 31d53afb9..129cf6490 100644 --- a/sql/schema/v1-core.sql +++ b/sql/schema/v1-core.sql @@ -1710,8 +1710,14 @@ BEGIN EXECUTE format( 'SELECT ARRAY( - SELECT DISTINCT e.tenant_id - FROM %I e + SELECT t.id + FROM "Tenant" t + WHERE EXISTS ( + SELECT 1 + FROM %I e + WHERE e.tenant_id = t.id + LIMIT 1 + ) )', partition_table) INTO result; @@ -1733,9 +1739,15 @@ BEGIN EXECUTE format( 'SELECT ARRAY( - SELECT DISTINCT e.tenant_id - FROM %I e - WHERE e.cut_over_at <= NOW() + SELECT t.id + FROM "Tenant" t + WHERE EXISTS ( + SELECT 1 + FROM %I e + WHERE e.tenant_id = t.id + AND e.cut_over_at <= NOW() + LIMIT 1 + ) )', partition_table) INTO result;