From 23db2a4fac1bf31cded684e55801fc40b7b04c81 Mon Sep 17 00:00:00 2001 From: matt Date: Mon, 15 Dec 2025 13:07:51 -0500 Subject: [PATCH] Fix: Pagination by bounds (#2654) * fix: pagination missing rows * fix: separate functions * fix: return both bounds from query * fix: wiring * fix: func * fix: order col * fix: bug * fix: math is hard * fix: more math * fix: math and math and math * fix: slightly more math * fix: placeholders :facepalm: * fix: where clause * fix: math! * fix: schema * refactor: try with `CEIL` * fix: mathin up a storm * fix: I was actually a math major in college, who knew * fix: copilot Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * fix: copilot Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .../migrations/20251212170748_v1_0_59.sql | 351 ++++++++++++++++++ pkg/repository/v1/olap.go | 10 +- pkg/repository/v1/payloadstore.go | 13 +- pkg/repository/v1/sqlcv1/olap.sql | 31 +- pkg/repository/v1/sqlcv1/olap.sql.go | 67 ++-- pkg/repository/v1/sqlcv1/payload-store.sql | 35 +- pkg/repository/v1/sqlcv1/payload-store.sql.go | 83 +++-- sql/schema/v1-core.sql | 96 ++++- sql/schema/v1-olap.sql | 89 ++++- 9 files changed, 658 insertions(+), 117 deletions(-) create mode 100644 cmd/hatchet-migrate/migrate/migrations/20251212170748_v1_0_59.sql diff --git a/cmd/hatchet-migrate/migrate/migrations/20251212170748_v1_0_59.sql b/cmd/hatchet-migrate/migrate/migrations/20251212170748_v1_0_59.sql new file mode 100644 index 000000000..a67f1bfdb --- /dev/null +++ b/cmd/hatchet-migrate/migrate/migrations/20251212170748_v1_0_59.sql @@ -0,0 +1,351 @@ +-- +goose Up +-- +goose StatementBegin +DROP FUNCTION list_paginated_payloads_for_offload(date, int, uuid, timestamptz, bigint, v1_payload_type); +CREATE OR REPLACE FUNCTION list_paginated_payloads_for_offload( + partition_date date, + last_tenant_id uuid, + last_inserted_at timestamptz, + last_id bigint, + last_type v1_payload_type, + next_tenant_id uuid, + next_inserted_at timestamptz, + next_id bigint, + next_type v1_payload_type +) RETURNS TABLE ( + tenant_id UUID, + id BIGINT, + inserted_at TIMESTAMPTZ, + external_id UUID, + type v1_payload_type, + location v1_payload_location, + external_location_key TEXT, + inline_content JSONB, + updated_at TIMESTAMPTZ +) + LANGUAGE plpgsql AS +$$ +DECLARE + partition_date_str varchar; + source_partition_name varchar; + query text; +BEGIN + IF partition_date IS NULL THEN + RAISE EXCEPTION 'partition_date parameter cannot be NULL'; + END IF; + + SELECT to_char(partition_date, 'YYYYMMDD') INTO partition_date_str; + SELECT format('v1_payload_%s', partition_date_str) INTO source_partition_name; + + IF NOT EXISTS (SELECT 1 FROM pg_tables WHERE tablename = source_partition_name) THEN + RAISE EXCEPTION 'Partition % does not exist', source_partition_name; + END IF; + + query := format(' + SELECT tenant_id, id, inserted_at, external_id, type, location, + external_location_key, inline_content, updated_at + FROM %I + WHERE + (tenant_id, inserted_at, id, type) >= ($1, $2, $3, $4) + AND (tenant_id, inserted_at, id, type) <= ($5, $6, $7, $8) + ORDER BY tenant_id, inserted_at, id, type + ', source_partition_name); + + RETURN QUERY EXECUTE query USING last_tenant_id, last_inserted_at, last_id, last_type, next_tenant_id, next_inserted_at, next_id, next_type; +END; +$$; + +DROP FUNCTION list_paginated_olap_payloads_for_offload(date, int, uuid, uuid, timestamptz); +CREATE OR REPLACE FUNCTION list_paginated_olap_payloads_for_offload( + partition_date date, + last_tenant_id uuid, + last_external_id uuid, + last_inserted_at timestamptz, + next_tenant_id uuid, + next_external_id uuid, + next_inserted_at timestamptz +) RETURNS TABLE ( + tenant_id UUID, + external_id UUID, + location v1_payload_location_olap, + external_location_key TEXT, + inline_content JSONB, + inserted_at TIMESTAMPTZ, + updated_at TIMESTAMPTZ +) + LANGUAGE plpgsql AS +$$ +DECLARE + partition_date_str varchar; + source_partition_name varchar; + query text; +BEGIN + IF partition_date IS NULL THEN + RAISE EXCEPTION 'partition_date parameter cannot be NULL'; + END IF; + + SELECT to_char(partition_date, 'YYYYMMDD') INTO partition_date_str; + SELECT format('v1_payloads_olap_%s', partition_date_str) INTO source_partition_name; + + IF NOT EXISTS (SELECT 1 FROM pg_tables WHERE tablename = source_partition_name) THEN + RAISE EXCEPTION 'Partition % does not exist', source_partition_name; + END IF; + + query := format(' + SELECT tenant_id, external_id, location, external_location_key, inline_content, inserted_at, updated_at + FROM %I + WHERE + (tenant_id, external_id, inserted_at) >= ($1, $2, $3) + AND (tenant_id, external_id, inserted_at) <= ($4, $5, $6) + ORDER BY tenant_id, external_id, inserted_at + ', source_partition_name); + + RETURN QUERY EXECUTE query USING last_tenant_id, last_external_id, last_inserted_at, next_tenant_id, next_external_id, next_inserted_at; +END; +$$; + +CREATE OR REPLACE FUNCTION create_payload_offload_range_chunks( + partition_date date, + window_size int, + chunk_size int, + last_tenant_id uuid, + last_inserted_at timestamptz, + last_id bigint, + last_type v1_payload_type +) RETURNS TABLE ( + lower_tenant_id UUID, + lower_id BIGINT, + lower_inserted_at TIMESTAMPTZ, + lower_type v1_payload_type, + upper_tenant_id UUID, + upper_id BIGINT, + upper_inserted_at TIMESTAMPTZ, + upper_type v1_payload_type +) + LANGUAGE plpgsql AS +$$ +DECLARE + partition_date_str varchar; + source_partition_name varchar; + query text; +BEGIN + IF partition_date IS NULL THEN + RAISE EXCEPTION 'partition_date parameter cannot be NULL'; + END IF; + + SELECT to_char(partition_date, 'YYYYMMDD') INTO partition_date_str; + SELECT format('v1_payload_%s', partition_date_str) INTO source_partition_name; + + IF NOT EXISTS (SELECT 1 FROM pg_tables WHERE tablename = source_partition_name) THEN + RAISE EXCEPTION 'Partition % does not exist', source_partition_name; + END IF; + + query := format(' + WITH paginated AS ( + SELECT tenant_id, id, inserted_at, type, ROW_NUMBER() OVER (ORDER BY tenant_id, inserted_at, id, type) AS rn + FROM %I + WHERE (tenant_id, inserted_at, id, type) > ($1, $2, $3, $4) + ORDER BY tenant_id, inserted_at, id, type + LIMIT $5::INTEGER + ), lower_bounds AS ( + SELECT rn::INTEGER / $6::INTEGER AS batch_ix, tenant_id::UUID, id::BIGINT, inserted_at::TIMESTAMPTZ, type::v1_payload_type + FROM paginated + WHERE MOD(rn, $6::INTEGER) = 1 + ), upper_bounds AS ( + SELECT + CEIL(rn::FLOAT / $6::FLOAT) - 1 AS batch_ix, + tenant_id::UUID, + id::BIGINT, + inserted_at::TIMESTAMPTZ, + type::v1_payload_type + FROM paginated + WHERE MOD(rn, $6::INTEGER) = 0 OR rn = (SELECT MAX(rn) FROM paginated) + ) + + SELECT + lb.tenant_id AS lower_tenant_id, + lb.id AS lower_id, + lb.inserted_at AS lower_inserted_at, + lb.type AS lower_type, + ub.tenant_id AS upper_tenant_id, + ub.id AS upper_id, + ub.inserted_at AS upper_inserted_at, + ub.type AS upper_type + FROM lower_bounds lb + JOIN upper_bounds ub ON lb.batch_ix = ub.batch_ix + ORDER BY lb.tenant_id, lb.inserted_at, lb.id, lb.type + ', source_partition_name); + + RETURN QUERY EXECUTE query USING last_tenant_id, last_inserted_at, last_id, last_type, window_size, chunk_size; +END; +$$; + +CREATE OR REPLACE FUNCTION create_olap_payload_offload_range_chunks( + partition_date date, + window_size int, + chunk_size int, + last_tenant_id uuid, + last_external_id uuid, + last_inserted_at timestamptz +) RETURNS TABLE ( + lower_tenant_id UUID, + lower_external_id UUID, + lower_inserted_at TIMESTAMPTZ, + upper_tenant_id UUID, + upper_external_id UUID, + upper_inserted_at TIMESTAMPTZ +) + LANGUAGE plpgsql AS +$$ +DECLARE + partition_date_str varchar; + source_partition_name varchar; + query text; +BEGIN + IF partition_date IS NULL THEN + RAISE EXCEPTION 'partition_date parameter cannot be NULL'; + END IF; + + SELECT to_char(partition_date, 'YYYYMMDD') INTO partition_date_str; + SELECT format('v1_payloads_olap_%s', partition_date_str) INTO source_partition_name; + + IF NOT EXISTS (SELECT 1 FROM pg_tables WHERE tablename = source_partition_name) THEN + RAISE EXCEPTION 'Partition % does not exist', source_partition_name; + END IF; + + query := format(' + WITH paginated AS ( + SELECT tenant_id, external_id, inserted_at, ROW_NUMBER() OVER (ORDER BY tenant_id, external_id, inserted_at) AS rn + FROM %I + WHERE (tenant_id, external_id, inserted_at) > ($1, $2, $3) + ORDER BY tenant_id, external_id, inserted_at + LIMIT $4 + ), lower_bounds AS ( + SELECT rn::INTEGER / $5::INTEGER AS batch_ix, tenant_id::UUID, external_id::UUID, inserted_at::TIMESTAMPTZ + FROM paginated + WHERE MOD(rn, $5::INTEGER) = 1 + ), upper_bounds AS ( + SELECT + CEIL(rn::FLOAT / $5::FLOAT) - 1 AS batch_ix, + tenant_id::UUID, + external_id::UUID, + inserted_at::TIMESTAMPTZ + FROM paginated + WHERE MOD(rn, $5::INTEGER) = 0 OR rn = (SELECT MAX(rn) FROM paginated) + ) + + SELECT + lb.tenant_id AS lower_tenant_id, + lb.external_id AS lower_external_id, + lb.inserted_at AS lower_inserted_at, + ub.tenant_id AS upper_tenant_id, + ub.external_id AS upper_external_id, + ub.inserted_at AS upper_inserted_at + FROM lower_bounds lb + JOIN upper_bounds ub ON lb.batch_ix = ub.batch_ix + ORDER BY lb.tenant_id, lb.external_id, lb.inserted_at + ', source_partition_name); + + RETURN QUERY EXECUTE query USING last_tenant_id, last_external_id, last_inserted_at, window_size, chunk_size; +END; +$$; +-- +goose StatementEnd + +-- +goose Down +-- +goose StatementBegin +CREATE OR REPLACE FUNCTION list_paginated_payloads_for_offload( + partition_date date, + limit_param int, + last_tenant_id uuid, + last_inserted_at timestamptz, + last_id bigint, + last_type v1_payload_type +) RETURNS TABLE ( + tenant_id UUID, + id BIGINT, + inserted_at TIMESTAMPTZ, + external_id UUID, + type v1_payload_type, + location v1_payload_location, + external_location_key TEXT, + inline_content JSONB, + updated_at TIMESTAMPTZ +) + LANGUAGE plpgsql AS +$$ +DECLARE + partition_date_str varchar; + source_partition_name varchar; + query text; +BEGIN + IF partition_date IS NULL THEN + RAISE EXCEPTION 'partition_date parameter cannot be NULL'; + END IF; + + SELECT to_char(partition_date, 'YYYYMMDD') INTO partition_date_str; + SELECT format('v1_payload_%s', partition_date_str) INTO source_partition_name; + + IF NOT EXISTS (SELECT 1 FROM pg_tables WHERE tablename = source_partition_name) THEN + RAISE EXCEPTION 'Partition % does not exist', source_partition_name; + END IF; + + query := format(' + SELECT tenant_id, id, inserted_at, external_id, type, location, + external_location_key, inline_content, updated_at + FROM %I + WHERE (tenant_id, inserted_at, id, type) >= ($1, $2, $3, $4) + ORDER BY tenant_id, inserted_at, id, type + LIMIT $5 + ', source_partition_name); + + RETURN QUERY EXECUTE query USING last_tenant_id, last_inserted_at, last_id, last_type, limit_param; +END; +$$; + +CREATE OR REPLACE FUNCTION list_paginated_olap_payloads_for_offload( + partition_date date, + limit_param int, + last_tenant_id uuid, + last_external_id uuid, + last_inserted_at timestamptz +) RETURNS TABLE ( + tenant_id UUID, + external_id UUID, + location v1_payload_location_olap, + external_location_key TEXT, + inline_content JSONB, + inserted_at TIMESTAMPTZ, + updated_at TIMESTAMPTZ +) + LANGUAGE plpgsql AS +$$ +DECLARE + partition_date_str varchar; + source_partition_name varchar; + query text; +BEGIN + IF partition_date IS NULL THEN + RAISE EXCEPTION 'partition_date parameter cannot be NULL'; + END IF; + + SELECT to_char(partition_date, 'YYYYMMDD') INTO partition_date_str; + SELECT format('v1_payloads_olap_%s', partition_date_str) INTO source_partition_name; + + IF NOT EXISTS (SELECT 1 FROM pg_tables WHERE tablename = source_partition_name) THEN + RAISE EXCEPTION 'Partition % does not exist', source_partition_name; + END IF; + + query := format(' + SELECT tenant_id, external_id, location, external_location_key, inline_content, inserted_at, updated_at + FROM %I + WHERE (tenant_id, external_id, inserted_at) >= ($1, $2, $3) + ORDER BY tenant_id, external_id, inserted_at + LIMIT $4 + ', source_partition_name); + + RETURN QUERY EXECUTE query USING last_tenant_id, last_external_id, last_inserted_at, limit_param; +END; +$$; + +DROP FUNCTION create_payload_offload_range_chunks(date, int, int, uuid, timestamptz, bigint, v1_payload_type); +DROP FUNCTION create_olap_payload_offload_range_chunks(date, int, int, uuid, uuid, timestamptz); +-- +goose StatementEnd diff --git a/pkg/repository/v1/olap.go b/pkg/repository/v1/olap.go index f70ab5e91..042bbfd41 100644 --- a/pkg/repository/v1/olap.go +++ b/pkg/repository/v1/olap.go @@ -2750,10 +2750,12 @@ func (p *OLAPRepositoryImpl) processOLAPPayloadCutoverBatch(ctx context.Context, eg.Go(func() error { payloads, err := p.queries.ListPaginatedOLAPPayloadsForOffload(ctx, p.pool, sqlcv1.ListPaginatedOLAPPayloadsForOffloadParams{ Partitiondate: pgtype.Date(partitionDate), - Limitparam: externalCutoverBatchSize, - Lasttenantid: pr.TenantID, - Lastexternalid: pr.ExternalID, - Lastinsertedat: pr.InsertedAt, + Lasttenantid: pr.LowerTenantID, + Lastexternalid: pr.LowerExternalID, + Lastinsertedat: pr.LowerInsertedAt, + Nexttenantid: pr.UpperTenantID, + Nextexternalid: pr.UpperExternalID, + Nextinsertedat: pr.UpperInsertedAt, }) if err != nil { diff --git a/pkg/repository/v1/payloadstore.go b/pkg/repository/v1/payloadstore.go index ed62876f5..6669837e1 100644 --- a/pkg/repository/v1/payloadstore.go +++ b/pkg/repository/v1/payloadstore.go @@ -481,11 +481,14 @@ func (p *payloadStoreRepositoryImpl) ProcessPayloadCutoverBatch(ctx context.Cont eg.Go(func() error { payloads, err := p.queries.ListPaginatedPayloadsForOffload(ctx, p.pool, sqlcv1.ListPaginatedPayloadsForOffloadParams{ Partitiondate: pgtype.Date(partitionDate), - Limitparam: p.externalCutoverBatchSize, - Lasttenantid: pr.TenantID, - Lastinsertedat: pr.InsertedAt, - Lastid: pr.ID, - Lasttype: pr.Type, + Lasttenantid: pr.LowerTenantID, + Lastinsertedat: pr.LowerInsertedAt, + Lastid: pr.LowerID, + Lasttype: pr.LowerType, + Nexttenantid: pr.UpperTenantID, + Nextinsertedat: pr.UpperInsertedAt, + Nextid: pr.UpperID, + Nexttype: pr.UpperType, }) if err != nil { diff --git a/pkg/repository/v1/sqlcv1/olap.sql b/pkg/repository/v1/sqlcv1/olap.sql index 09c9f2e32..8cef94349 100644 --- a/pkg/repository/v1/sqlcv1/olap.sql +++ b/pkg/repository/v1/sqlcv1/olap.sql @@ -1851,10 +1851,12 @@ WITH payloads AS ( (p).* FROM list_paginated_olap_payloads_for_offload( @partitionDate::DATE, - @limitParam::INT, @lastTenantId::UUID, @lastExternalId::UUID, - @lastInsertedAt::TIMESTAMPTZ + @lastInsertedAt::TIMESTAMPTZ, + @nextTenantId::UUID, + @nextExternalId::UUID, + @nextInsertedAt::TIMESTAMPTZ ) p ) SELECT @@ -1868,30 +1870,27 @@ SELECT FROM payloads; -- name: CreateOLAPPayloadRangeChunks :many -WITH payloads AS ( +WITH chunks AS ( SELECT (p).* - FROM list_paginated_olap_payloads_for_offload( + FROM create_olap_payload_offload_range_chunks( @partitionDate::DATE, @windowSize::INTEGER, + @chunkSize::INTEGER, @lastTenantId::UUID, @lastExternalId::UUID, @lastInsertedAt::TIMESTAMPTZ ) p -), with_rows AS ( - SELECT - tenant_id::UUID, - external_id::UUID, - inserted_at::TIMESTAMPTZ, - ROW_NUMBER() OVER (ORDER BY tenant_id, external_id, inserted_at) AS rn - FROM payloads ) -SELECT * -FROM with_rows --- row numbers are one-indexed -WHERE MOD(rn, @chunkSize::INTEGER) = 1 -ORDER BY tenant_id, external_id, inserted_at +SELECT + lower_tenant_id::UUID, + lower_external_id::UUID, + lower_inserted_at::TIMESTAMPTZ, + upper_tenant_id::UUID, + upper_external_id::UUID, + upper_inserted_at::TIMESTAMPTZ +FROM chunks ; -- name: CreateV1PayloadOLAPCutoverTemporaryTable :exec diff --git a/pkg/repository/v1/sqlcv1/olap.sql.go b/pkg/repository/v1/sqlcv1/olap.sql.go index 743e0c8ea..dd8bb6a09 100644 --- a/pkg/repository/v1/sqlcv1/olap.sql.go +++ b/pkg/repository/v1/sqlcv1/olap.sql.go @@ -323,53 +323,52 @@ func (q *Queries) CreateOLAPPartitions(ctx context.Context, db DBTX, arg CreateO } const createOLAPPayloadRangeChunks = `-- name: CreateOLAPPayloadRangeChunks :many -WITH payloads AS ( +WITH chunks AS ( SELECT (p).* - FROM list_paginated_olap_payloads_for_offload( - $2::DATE, + FROM create_olap_payload_offload_range_chunks( + $1::DATE, + $2::INTEGER, $3::INTEGER, $4::UUID, $5::UUID, $6::TIMESTAMPTZ ) p -), with_rows AS ( - SELECT - tenant_id::UUID, - external_id::UUID, - inserted_at::TIMESTAMPTZ, - ROW_NUMBER() OVER (ORDER BY tenant_id, external_id, inserted_at) AS rn - FROM payloads ) -SELECT tenant_id, external_id, inserted_at, rn -FROM with_rows -WHERE MOD(rn, $1::INTEGER) = 1 -ORDER BY tenant_id, external_id, inserted_at +SELECT + lower_tenant_id::UUID, + lower_external_id::UUID, + lower_inserted_at::TIMESTAMPTZ, + upper_tenant_id::UUID, + upper_external_id::UUID, + upper_inserted_at::TIMESTAMPTZ +FROM chunks ` type CreateOLAPPayloadRangeChunksParams struct { - Chunksize int32 `json:"chunksize"` Partitiondate pgtype.Date `json:"partitiondate"` Windowsize int32 `json:"windowsize"` + Chunksize int32 `json:"chunksize"` Lasttenantid pgtype.UUID `json:"lasttenantid"` Lastexternalid pgtype.UUID `json:"lastexternalid"` Lastinsertedat pgtype.Timestamptz `json:"lastinsertedat"` } type CreateOLAPPayloadRangeChunksRow struct { - TenantID pgtype.UUID `json:"tenant_id"` - ExternalID pgtype.UUID `json:"external_id"` - InsertedAt pgtype.Timestamptz `json:"inserted_at"` - Rn int64 `json:"rn"` + LowerTenantID pgtype.UUID `json:"lower_tenant_id"` + LowerExternalID pgtype.UUID `json:"lower_external_id"` + LowerInsertedAt pgtype.Timestamptz `json:"lower_inserted_at"` + UpperTenantID pgtype.UUID `json:"upper_tenant_id"` + UpperExternalID pgtype.UUID `json:"upper_external_id"` + UpperInsertedAt pgtype.Timestamptz `json:"upper_inserted_at"` } -// row numbers are one-indexed func (q *Queries) CreateOLAPPayloadRangeChunks(ctx context.Context, db DBTX, arg CreateOLAPPayloadRangeChunksParams) ([]*CreateOLAPPayloadRangeChunksRow, error) { rows, err := db.Query(ctx, createOLAPPayloadRangeChunks, - arg.Chunksize, arg.Partitiondate, arg.Windowsize, + arg.Chunksize, arg.Lasttenantid, arg.Lastexternalid, arg.Lastinsertedat, @@ -382,10 +381,12 @@ func (q *Queries) CreateOLAPPayloadRangeChunks(ctx context.Context, db DBTX, arg for rows.Next() { var i CreateOLAPPayloadRangeChunksRow if err := rows.Scan( - &i.TenantID, - &i.ExternalID, - &i.InsertedAt, - &i.Rn, + &i.LowerTenantID, + &i.LowerExternalID, + &i.LowerInsertedAt, + &i.UpperTenantID, + &i.UpperExternalID, + &i.UpperInsertedAt, ); err != nil { return nil, err } @@ -1329,10 +1330,12 @@ WITH payloads AS ( (p).* FROM list_paginated_olap_payloads_for_offload( $1::DATE, - $2::INT, + $2::UUID, $3::UUID, - $4::UUID, - $5::TIMESTAMPTZ + $4::TIMESTAMPTZ, + $5::UUID, + $6::UUID, + $7::TIMESTAMPTZ ) p ) SELECT @@ -1348,10 +1351,12 @@ FROM payloads type ListPaginatedOLAPPayloadsForOffloadParams struct { Partitiondate pgtype.Date `json:"partitiondate"` - Limitparam int32 `json:"limitparam"` Lasttenantid pgtype.UUID `json:"lasttenantid"` Lastexternalid pgtype.UUID `json:"lastexternalid"` Lastinsertedat pgtype.Timestamptz `json:"lastinsertedat"` + Nexttenantid pgtype.UUID `json:"nexttenantid"` + Nextexternalid pgtype.UUID `json:"nextexternalid"` + Nextinsertedat pgtype.Timestamptz `json:"nextinsertedat"` } type ListPaginatedOLAPPayloadsForOffloadRow struct { @@ -1367,10 +1372,12 @@ type ListPaginatedOLAPPayloadsForOffloadRow struct { func (q *Queries) ListPaginatedOLAPPayloadsForOffload(ctx context.Context, db DBTX, arg ListPaginatedOLAPPayloadsForOffloadParams) ([]*ListPaginatedOLAPPayloadsForOffloadRow, error) { rows, err := db.Query(ctx, listPaginatedOLAPPayloadsForOffload, arg.Partitiondate, - arg.Limitparam, arg.Lasttenantid, arg.Lastexternalid, arg.Lastinsertedat, + arg.Nexttenantid, + arg.Nextexternalid, + arg.Nextinsertedat, ) if err != nil { return nil, err diff --git a/pkg/repository/v1/sqlcv1/payload-store.sql b/pkg/repository/v1/sqlcv1/payload-store.sql index 5ca429f51..1c4e6b42b 100644 --- a/pkg/repository/v1/sqlcv1/payload-store.sql +++ b/pkg/repository/v1/sqlcv1/payload-store.sql @@ -222,11 +222,14 @@ WITH payloads AS ( (p).* FROM list_paginated_payloads_for_offload( @partitionDate::DATE, - @limitParam::INT, @lastTenantId::UUID, @lastInsertedAt::TIMESTAMPTZ, @lastId::BIGINT, - @lastType::v1_payload_type + @lastType::v1_payload_type, + @nextTenantId::UUID, + @nextInsertedAt::TIMESTAMPTZ, + @nextId::BIGINT, + @nextType::v1_payload_type ) p ) SELECT @@ -242,32 +245,30 @@ SELECT FROM payloads; -- name: CreatePayloadRangeChunks :many -WITH payloads AS ( +WITH chunks AS ( SELECT (p).* - FROM list_paginated_payloads_for_offload( + FROM create_payload_offload_range_chunks( @partitionDate::DATE, @windowSize::INTEGER, + @chunkSize::INTEGER, @lastTenantId::UUID, @lastInsertedAt::TIMESTAMPTZ, @lastId::BIGINT, @lastType::v1_payload_type ) p -), with_rows AS ( - SELECT - tenant_id::UUID, - id::BIGINT, - inserted_at::TIMESTAMPTZ, - type::v1_payload_type, - ROW_NUMBER() OVER (ORDER BY tenant_id, inserted_at, id, type) AS rn - FROM payloads ) -SELECT * -FROM with_rows --- row numbers are one-indexed -WHERE MOD(rn, @chunkSize::INTEGER) = 1 -ORDER BY tenant_id, inserted_at, id, type +SELECT + lower_tenant_id::UUID, + lower_id::BIGINT, + lower_inserted_at::TIMESTAMPTZ, + lower_type::v1_payload_type, + upper_tenant_id::UUID, + upper_id::BIGINT, + upper_inserted_at::TIMESTAMPTZ, + upper_type::v1_payload_type +FROM chunks ; -- name: CreateV1PayloadCutoverTemporaryTable :exec diff --git a/pkg/repository/v1/sqlcv1/payload-store.sql.go b/pkg/repository/v1/sqlcv1/payload-store.sql.go index df80f9547..3b49c32d1 100644 --- a/pkg/repository/v1/sqlcv1/payload-store.sql.go +++ b/pkg/repository/v1/sqlcv1/payload-store.sql.go @@ -108,37 +108,36 @@ func (q *Queries) AnalyzeV1Payload(ctx context.Context, db DBTX) error { } const createPayloadRangeChunks = `-- name: CreatePayloadRangeChunks :many -WITH payloads AS ( +WITH chunks AS ( SELECT (p).* - FROM list_paginated_payloads_for_offload( - $2::DATE, + FROM create_payload_offload_range_chunks( + $1::DATE, + $2::INTEGER, $3::INTEGER, $4::UUID, $5::TIMESTAMPTZ, $6::BIGINT, $7::v1_payload_type ) p -), with_rows AS ( - SELECT - tenant_id::UUID, - id::BIGINT, - inserted_at::TIMESTAMPTZ, - type::v1_payload_type, - ROW_NUMBER() OVER (ORDER BY tenant_id, inserted_at, id, type) AS rn - FROM payloads ) -SELECT tenant_id, id, inserted_at, type, rn -FROM with_rows -WHERE MOD(rn, $1::INTEGER) = 1 -ORDER BY tenant_id, inserted_at, id, type +SELECT + lower_tenant_id::UUID, + lower_id::BIGINT, + lower_inserted_at::TIMESTAMPTZ, + lower_type::v1_payload_type, + upper_tenant_id::UUID, + upper_id::BIGINT, + upper_inserted_at::TIMESTAMPTZ, + upper_type::v1_payload_type +FROM chunks ` type CreatePayloadRangeChunksParams struct { - Chunksize int32 `json:"chunksize"` Partitiondate pgtype.Date `json:"partitiondate"` Windowsize int32 `json:"windowsize"` + Chunksize int32 `json:"chunksize"` Lasttenantid pgtype.UUID `json:"lasttenantid"` Lastinsertedat pgtype.Timestamptz `json:"lastinsertedat"` Lastid int64 `json:"lastid"` @@ -146,19 +145,21 @@ type CreatePayloadRangeChunksParams struct { } type CreatePayloadRangeChunksRow struct { - TenantID pgtype.UUID `json:"tenant_id"` - ID int64 `json:"id"` - InsertedAt pgtype.Timestamptz `json:"inserted_at"` - Type V1PayloadType `json:"type"` - Rn int64 `json:"rn"` + LowerTenantID pgtype.UUID `json:"lower_tenant_id"` + LowerID int64 `json:"lower_id"` + LowerInsertedAt pgtype.Timestamptz `json:"lower_inserted_at"` + LowerType V1PayloadType `json:"lower_type"` + UpperTenantID pgtype.UUID `json:"upper_tenant_id"` + UpperID int64 `json:"upper_id"` + UpperInsertedAt pgtype.Timestamptz `json:"upper_inserted_at"` + UpperType V1PayloadType `json:"upper_type"` } -// row numbers are one-indexed func (q *Queries) CreatePayloadRangeChunks(ctx context.Context, db DBTX, arg CreatePayloadRangeChunksParams) ([]*CreatePayloadRangeChunksRow, error) { rows, err := db.Query(ctx, createPayloadRangeChunks, - arg.Chunksize, arg.Partitiondate, arg.Windowsize, + arg.Chunksize, arg.Lasttenantid, arg.Lastinsertedat, arg.Lastid, @@ -172,11 +173,14 @@ func (q *Queries) CreatePayloadRangeChunks(ctx context.Context, db DBTX, arg Cre for rows.Next() { var i CreatePayloadRangeChunksRow if err := rows.Scan( - &i.TenantID, - &i.ID, - &i.InsertedAt, - &i.Type, - &i.Rn, + &i.LowerTenantID, + &i.LowerID, + &i.LowerInsertedAt, + &i.LowerType, + &i.UpperTenantID, + &i.UpperID, + &i.UpperInsertedAt, + &i.UpperType, ); err != nil { return nil, err } @@ -257,11 +261,14 @@ WITH payloads AS ( (p).* FROM list_paginated_payloads_for_offload( $1::DATE, - $2::INT, - $3::UUID, - $4::TIMESTAMPTZ, - $5::BIGINT, - $6::v1_payload_type + $2::UUID, + $3::TIMESTAMPTZ, + $4::BIGINT, + $5::v1_payload_type, + $6::UUID, + $7::TIMESTAMPTZ, + $8::BIGINT, + $9::v1_payload_type ) p ) SELECT @@ -279,11 +286,14 @@ FROM payloads type ListPaginatedPayloadsForOffloadParams struct { Partitiondate pgtype.Date `json:"partitiondate"` - Limitparam int32 `json:"limitparam"` Lasttenantid pgtype.UUID `json:"lasttenantid"` Lastinsertedat pgtype.Timestamptz `json:"lastinsertedat"` Lastid int64 `json:"lastid"` Lasttype V1PayloadType `json:"lasttype"` + Nexttenantid pgtype.UUID `json:"nexttenantid"` + Nextinsertedat pgtype.Timestamptz `json:"nextinsertedat"` + Nextid int64 `json:"nextid"` + Nexttype V1PayloadType `json:"nexttype"` } type ListPaginatedPayloadsForOffloadRow struct { @@ -301,11 +311,14 @@ type ListPaginatedPayloadsForOffloadRow struct { func (q *Queries) ListPaginatedPayloadsForOffload(ctx context.Context, db DBTX, arg ListPaginatedPayloadsForOffloadParams) ([]*ListPaginatedPayloadsForOffloadRow, error) { rows, err := db.Query(ctx, listPaginatedPayloadsForOffload, arg.Partitiondate, - arg.Limitparam, arg.Lasttenantid, arg.Lastinsertedat, arg.Lastid, arg.Lasttype, + arg.Nexttenantid, + arg.Nextinsertedat, + arg.Nextid, + arg.Nexttype, ) if err != nil { return nil, err diff --git a/sql/schema/v1-core.sql b/sql/schema/v1-core.sql index e0fe6adf2..bdd9ff526 100644 --- a/sql/schema/v1-core.sql +++ b/sql/schema/v1-core.sql @@ -1878,11 +1878,14 @@ $$; CREATE OR REPLACE FUNCTION list_paginated_payloads_for_offload( partition_date date, - limit_param int, last_tenant_id uuid, last_inserted_at timestamptz, last_id bigint, - last_type v1_payload_type + last_type v1_payload_type, + next_tenant_id uuid, + next_inserted_at timestamptz, + next_id bigint, + next_type v1_payload_type ) RETURNS TABLE ( tenant_id UUID, id BIGINT, @@ -1916,12 +1919,95 @@ BEGIN SELECT tenant_id, id, inserted_at, external_id, type, location, external_location_key, inline_content, updated_at FROM %I - WHERE (tenant_id, inserted_at, id, type) >= ($1, $2, $3, $4) + WHERE + (tenant_id, inserted_at, id, type) >= ($1, $2, $3, $4) + AND (tenant_id, inserted_at, id, type) <= ($5, $6, $7, $8) ORDER BY tenant_id, inserted_at, id, type - LIMIT $5 ', source_partition_name); - RETURN QUERY EXECUTE query USING last_tenant_id, last_inserted_at, last_id, last_type, limit_param; + RETURN QUERY EXECUTE query USING last_tenant_id, last_inserted_at, last_id, last_type, next_tenant_id, next_inserted_at, next_id, next_type; +END; +$$; + +CREATE OR REPLACE FUNCTION create_payload_offload_range_chunks( + partition_date date, + window_size int, + chunk_size int, + last_tenant_id uuid, + last_inserted_at timestamptz, + last_id bigint, + last_type v1_payload_type +) RETURNS TABLE ( + lower_tenant_id UUID, + lower_id BIGINT, + lower_inserted_at TIMESTAMPTZ, + lower_type v1_payload_type, + upper_tenant_id UUID, + upper_id BIGINT, + upper_inserted_at TIMESTAMPTZ, + upper_type v1_payload_type +) + LANGUAGE plpgsql AS +$$ +DECLARE + partition_date_str varchar; + source_partition_name varchar; + query text; +BEGIN + IF partition_date IS NULL THEN + RAISE EXCEPTION 'partition_date parameter cannot be NULL'; + END IF; + + SELECT to_char(partition_date, 'YYYYMMDD') INTO partition_date_str; + SELECT format('v1_payload_%s', partition_date_str) INTO source_partition_name; + + IF NOT EXISTS (SELECT 1 FROM pg_tables WHERE tablename = source_partition_name) THEN + RAISE EXCEPTION 'Partition % does not exist', source_partition_name; + END IF; + + query := format(' + WITH paginated AS ( + SELECT tenant_id, id, inserted_at, type, ROW_NUMBER() OVER (ORDER BY tenant_id, inserted_at, id, type) AS rn + FROM %I + WHERE (tenant_id, inserted_at, id, type) > ($1, $2, $3, $4) + ORDER BY tenant_id, inserted_at, id, type + LIMIT $5::INTEGER + ), lower_bounds AS ( + SELECT rn::INTEGER / $6::INTEGER AS batch_ix, tenant_id::UUID, id::BIGINT, inserted_at::TIMESTAMPTZ, type::v1_payload_type + FROM paginated + WHERE MOD(rn, $6::INTEGER) = 1 + ), upper_bounds AS ( + SELECT + -- Using `CEIL` and subtracting 1 here to make the `batch_ix` zero indexed like the `lower_bounds` one is. + -- We need the `CEIL` to handle the case where the number of rows in the window is not evenly divisible by the batch size, + -- because without CEIL if e.g. there were 5 rows in the window and a batch size of two and we did integer division, we would end + -- up with batches of index 0, 1, and 1 after dividing and subtracting. With float division and `CEIL`, we get 0, 1, and 2 as expected. + -- Then we need to subtract one because we compute the batch index by using integer division on the lower bounds, which are all zero indexed. + CEIL(rn::FLOAT / $6::FLOAT) - 1 AS batch_ix, + tenant_id::UUID, + id::BIGINT, + inserted_at::TIMESTAMPTZ, + type::v1_payload_type + FROM paginated + -- We want to include either the last row of each batch, or the last row of the entire paginated set, which may not line up with a batch end. + WHERE MOD(rn, $6::INTEGER) = 0 OR rn = (SELECT MAX(rn) FROM paginated) + ) + + SELECT + lb.tenant_id AS lower_tenant_id, + lb.id AS lower_id, + lb.inserted_at AS lower_inserted_at, + lb.type AS lower_type, + ub.tenant_id AS upper_tenant_id, + ub.id AS upper_id, + ub.inserted_at AS upper_inserted_at, + ub.type AS upper_type + FROM lower_bounds lb + JOIN upper_bounds ub ON lb.batch_ix = ub.batch_ix + ORDER BY lb.tenant_id, lb.inserted_at, lb.id, lb.type + ', source_partition_name); + + RETURN QUERY EXECUTE query USING last_tenant_id, last_inserted_at, last_id, last_type, window_size, chunk_size; END; $$; diff --git a/sql/schema/v1-olap.sql b/sql/schema/v1-olap.sql index 967467fef..0f772dc4a 100644 --- a/sql/schema/v1-olap.sql +++ b/sql/schema/v1-olap.sql @@ -920,10 +920,12 @@ $$; CREATE OR REPLACE FUNCTION list_paginated_olap_payloads_for_offload( partition_date date, - limit_param int, last_tenant_id uuid, last_external_id uuid, - last_inserted_at timestamptz + last_inserted_at timestamptz, + next_tenant_id uuid, + next_external_id uuid, + next_inserted_at timestamptz ) RETURNS TABLE ( tenant_id UUID, external_id UUID, @@ -954,12 +956,89 @@ BEGIN query := format(' SELECT tenant_id, external_id, location, external_location_key, inline_content, inserted_at, updated_at FROM %I - WHERE (tenant_id, external_id, inserted_at) >= ($1, $2, $3) + WHERE + (tenant_id, external_id, inserted_at) >= ($1, $2, $3) + AND (tenant_id, external_id, inserted_at) <= ($4, $5, $6) ORDER BY tenant_id, external_id, inserted_at - LIMIT $4 ', source_partition_name); - RETURN QUERY EXECUTE query USING last_tenant_id, last_external_id, last_inserted_at, limit_param; + RETURN QUERY EXECUTE query USING last_tenant_id, last_external_id, last_inserted_at, next_tenant_id, next_external_id, next_inserted_at; +END; +$$; + +CREATE OR REPLACE FUNCTION create_olap_payload_offload_range_chunks( + partition_date date, + window_size int, + chunk_size int, + last_tenant_id uuid, + last_external_id uuid, + last_inserted_at timestamptz +) RETURNS TABLE ( + lower_tenant_id UUID, + lower_external_id UUID, + lower_inserted_at TIMESTAMPTZ, + upper_tenant_id UUID, + upper_external_id UUID, + upper_inserted_at TIMESTAMPTZ +) + LANGUAGE plpgsql AS +$$ +DECLARE + partition_date_str varchar; + source_partition_name varchar; + query text; +BEGIN + IF partition_date IS NULL THEN + RAISE EXCEPTION 'partition_date parameter cannot be NULL'; + END IF; + + SELECT to_char(partition_date, 'YYYYMMDD') INTO partition_date_str; + SELECT format('v1_payloads_olap_%s', partition_date_str) INTO source_partition_name; + + IF NOT EXISTS (SELECT 1 FROM pg_tables WHERE tablename = source_partition_name) THEN + RAISE EXCEPTION 'Partition % does not exist', source_partition_name; + END IF; + + query := format(' + WITH paginated AS ( + SELECT tenant_id, external_id, inserted_at, ROW_NUMBER() OVER (ORDER BY tenant_id, external_id, inserted_at) AS rn + FROM %I + WHERE (tenant_id, external_id, inserted_at) > ($1, $2, $3) + ORDER BY tenant_id, external_id, inserted_at + LIMIT $4 + ), lower_bounds AS ( + SELECT rn::INTEGER / $5::INTEGER AS batch_ix, tenant_id::UUID, external_id::UUID, inserted_at::TIMESTAMPTZ + FROM paginated + WHERE MOD(rn, $5::INTEGER) = 1 + ), upper_bounds AS ( + SELECT + -- Using `CEIL` and subtracting 1 here to make the `batch_ix` zero indexed like the `lower_bounds` one is. + -- We need the `CEIL` to handle the case where the number of rows in the window is not evenly divisible by the batch size, + -- because without CEIL if e.g. there were 5 rows in the window and a batch size of two and we did integer division, we would end + -- up with batches of index 0, 1, and 1 after dividing and subtracting. With float division and `CEIL`, we get 0, 1, and 2 as expected. + -- Then we need to subtract one because we compute the batch index by using integer division on the lower bounds, which are all zero indexed. + CEIL(rn::FLOAT / $5::FLOAT) - 1 AS batch_ix, + tenant_id::UUID, + external_id::UUID, + inserted_at::TIMESTAMPTZ + FROM paginated + -- We want to include either the last row of each batch, or the last row of the entire paginated set, which may not line up with a batch end. + WHERE MOD(rn, $5::INTEGER) = 0 OR rn = (SELECT MAX(rn) FROM paginated) + ) + + SELECT + lb.tenant_id AS lower_tenant_id, + lb.external_id AS lower_external_id, + lb.inserted_at AS lower_inserted_at, + ub.tenant_id AS upper_tenant_id, + ub.external_id AS upper_external_id, + ub.inserted_at AS upper_inserted_at + FROM lower_bounds lb + JOIN upper_bounds ub ON lb.batch_ix = ub.batch_ix + ORDER BY lb.tenant_id, lb.external_id, lb.inserted_at + ', source_partition_name); + + RETURN QUERY EXECUTE query USING last_tenant_id, last_external_id, last_inserted_at, window_size, chunk_size; END; $$;