mirror of
https://github.com/hatchet-dev/hatchet.git
synced 2025-12-21 08:40:10 -06:00
Fix: Payload List Index Performance (#2669)
* feat: update core func * feat: migration to update function * fix: pass batch size through * fix: limit * feat: materialized cte * chore: comment
This commit is contained in:
@@ -0,0 +1,233 @@
|
||||
-- +goose Up
|
||||
-- +goose StatementBegin
|
||||
DROP FUNCTION list_paginated_payloads_for_offload(date, uuid, timestamptz, bigint, v1_payload_type, uuid, timestamptz, bigint, v1_payload_type);
|
||||
CREATE OR REPLACE FUNCTION list_paginated_payloads_for_offload(
|
||||
partition_date date,
|
||||
last_tenant_id uuid,
|
||||
last_inserted_at timestamptz,
|
||||
last_id bigint,
|
||||
last_type v1_payload_type,
|
||||
next_tenant_id uuid,
|
||||
next_inserted_at timestamptz,
|
||||
next_id bigint,
|
||||
next_type v1_payload_type,
|
||||
batch_size integer
|
||||
) RETURNS TABLE (
|
||||
tenant_id UUID,
|
||||
id BIGINT,
|
||||
inserted_at TIMESTAMPTZ,
|
||||
external_id UUID,
|
||||
type v1_payload_type,
|
||||
location v1_payload_location,
|
||||
external_location_key TEXT,
|
||||
inline_content JSONB,
|
||||
updated_at TIMESTAMPTZ
|
||||
)
|
||||
LANGUAGE plpgsql AS
|
||||
$$
|
||||
DECLARE
|
||||
partition_date_str varchar;
|
||||
source_partition_name varchar;
|
||||
query text;
|
||||
BEGIN
|
||||
IF partition_date IS NULL THEN
|
||||
RAISE EXCEPTION 'partition_date parameter cannot be NULL';
|
||||
END IF;
|
||||
|
||||
SELECT to_char(partition_date, 'YYYYMMDD') INTO partition_date_str;
|
||||
SELECT format('v1_payload_%s', partition_date_str) INTO source_partition_name;
|
||||
|
||||
IF NOT EXISTS (SELECT 1 FROM pg_tables WHERE tablename = source_partition_name) THEN
|
||||
RAISE EXCEPTION 'Partition % does not exist', source_partition_name;
|
||||
END IF;
|
||||
|
||||
query := format('
|
||||
WITH candidates AS MATERIALIZED (
|
||||
SELECT tenant_id, id, inserted_at, external_id, type, location,
|
||||
external_location_key, inline_content, updated_at
|
||||
FROM %I
|
||||
WHERE
|
||||
(tenant_id, inserted_at, id, type) >= ($1, $2, $3, $4)
|
||||
ORDER BY tenant_id, inserted_at, id, type
|
||||
LIMIT $9 * 2
|
||||
)
|
||||
|
||||
SELECT tenant_id, id, inserted_at, external_id, type, location,
|
||||
external_location_key, inline_content, updated_at
|
||||
FROM candidates
|
||||
WHERE
|
||||
(tenant_id, inserted_at, id, type) >= ($1, $2, $3, $4)
|
||||
AND (tenant_id, inserted_at, id, type) <= ($5, $6, $7, $8)
|
||||
ORDER BY tenant_id, inserted_at, id, type
|
||||
', source_partition_name);
|
||||
|
||||
RETURN QUERY EXECUTE query USING last_tenant_id, last_inserted_at, last_id, last_type, next_tenant_id, next_inserted_at, next_id, next_type, batch_size;
|
||||
END;
|
||||
$$;
|
||||
|
||||
DROP FUNCTION list_paginated_olap_payloads_for_offload(date, uuid, uuid, timestamptz, uuid, uuid, timestamptz);
|
||||
CREATE OR REPLACE FUNCTION list_paginated_olap_payloads_for_offload(
|
||||
partition_date date,
|
||||
last_tenant_id uuid,
|
||||
last_external_id uuid,
|
||||
last_inserted_at timestamptz,
|
||||
next_tenant_id uuid,
|
||||
next_external_id uuid,
|
||||
next_inserted_at timestamptz,
|
||||
batch_size integer
|
||||
) RETURNS TABLE (
|
||||
tenant_id UUID,
|
||||
external_id UUID,
|
||||
location v1_payload_location_olap,
|
||||
external_location_key TEXT,
|
||||
inline_content JSONB,
|
||||
inserted_at TIMESTAMPTZ,
|
||||
updated_at TIMESTAMPTZ
|
||||
)
|
||||
LANGUAGE plpgsql AS
|
||||
$$
|
||||
DECLARE
|
||||
partition_date_str varchar;
|
||||
source_partition_name varchar;
|
||||
query text;
|
||||
BEGIN
|
||||
IF partition_date IS NULL THEN
|
||||
RAISE EXCEPTION 'partition_date parameter cannot be NULL';
|
||||
END IF;
|
||||
|
||||
SELECT to_char(partition_date, 'YYYYMMDD') INTO partition_date_str;
|
||||
SELECT format('v1_payloads_olap_%s', partition_date_str) INTO source_partition_name;
|
||||
|
||||
IF NOT EXISTS (SELECT 1 FROM pg_tables WHERE tablename = source_partition_name) THEN
|
||||
RAISE EXCEPTION 'Partition % does not exist', source_partition_name;
|
||||
END IF;
|
||||
|
||||
query := format('
|
||||
WITH candidates AS MATERIALIZED (
|
||||
SELECT tenant_id, external_id, location, external_location_key, inline_content, inserted_at, updated_at
|
||||
FROM %I
|
||||
WHERE
|
||||
(tenant_id, external_id, inserted_at) >= ($1, $2, $3)
|
||||
ORDER BY tenant_id, external_id, inserted_at
|
||||
LIMIT $7 * 2
|
||||
)
|
||||
|
||||
SELECT tenant_id, external_id, location, external_location_key, inline_content, inserted_at, updated_at
|
||||
FROM candidates
|
||||
WHERE
|
||||
(tenant_id, external_id, inserted_at) >= ($1, $2, $3)
|
||||
AND (tenant_id, external_id, inserted_at) <= ($4, $5, $6)
|
||||
ORDER BY tenant_id, external_id, inserted_at
|
||||
', source_partition_name);
|
||||
|
||||
RETURN QUERY EXECUTE query USING last_tenant_id, last_external_id, last_inserted_at, next_tenant_id, next_external_id, next_inserted_at, batch_size;
|
||||
END;
|
||||
$$;
|
||||
-- +goose StatementEnd
|
||||
|
||||
-- +goose Down
|
||||
-- +goose StatementBegin
|
||||
DROP FUNCTION list_paginated_payloads_for_offload(date, uuid, timestamptz, bigint, v1_payload_type, uuid, timestamptz, bigint, v1_payload_type, integer);
|
||||
CREATE OR REPLACE FUNCTION list_paginated_payloads_for_offload(
|
||||
partition_date date,
|
||||
last_tenant_id uuid,
|
||||
last_inserted_at timestamptz,
|
||||
last_id bigint,
|
||||
last_type v1_payload_type,
|
||||
next_tenant_id uuid,
|
||||
next_inserted_at timestamptz,
|
||||
next_id bigint,
|
||||
next_type v1_payload_type
|
||||
) RETURNS TABLE (
|
||||
tenant_id UUID,
|
||||
id BIGINT,
|
||||
inserted_at TIMESTAMPTZ,
|
||||
external_id UUID,
|
||||
type v1_payload_type,
|
||||
location v1_payload_location,
|
||||
external_location_key TEXT,
|
||||
inline_content JSONB,
|
||||
updated_at TIMESTAMPTZ
|
||||
)
|
||||
LANGUAGE plpgsql AS
|
||||
$$
|
||||
DECLARE
|
||||
partition_date_str varchar;
|
||||
source_partition_name varchar;
|
||||
query text;
|
||||
BEGIN
|
||||
IF partition_date IS NULL THEN
|
||||
RAISE EXCEPTION 'partition_date parameter cannot be NULL';
|
||||
END IF;
|
||||
|
||||
SELECT to_char(partition_date, 'YYYYMMDD') INTO partition_date_str;
|
||||
SELECT format('v1_payload_%s', partition_date_str) INTO source_partition_name;
|
||||
|
||||
IF NOT EXISTS (SELECT 1 FROM pg_tables WHERE tablename = source_partition_name) THEN
|
||||
RAISE EXCEPTION 'Partition % does not exist', source_partition_name;
|
||||
END IF;
|
||||
|
||||
query := format('
|
||||
SELECT tenant_id, id, inserted_at, external_id, type, location,
|
||||
external_location_key, inline_content, updated_at
|
||||
FROM %I
|
||||
WHERE
|
||||
(tenant_id, inserted_at, id, type) >= ($1, $2, $3, $4)
|
||||
AND (tenant_id, inserted_at, id, type) <= ($5, $6, $7, $8)
|
||||
ORDER BY tenant_id, inserted_at, id, type
|
||||
', source_partition_name);
|
||||
|
||||
RETURN QUERY EXECUTE query USING last_tenant_id, last_inserted_at, last_id, last_type, next_tenant_id, next_inserted_at, next_id, next_type;
|
||||
END;
|
||||
$$;
|
||||
|
||||
|
||||
DROP FUNCTION list_paginated_olap_payloads_for_offload(date, uuid, uuid, timestamptz, uuid, uuid, timestamptz, integer);
|
||||
CREATE OR REPLACE FUNCTION list_paginated_olap_payloads_for_offload(
|
||||
partition_date date,
|
||||
last_tenant_id uuid,
|
||||
last_external_id uuid,
|
||||
last_inserted_at timestamptz,
|
||||
next_tenant_id uuid,
|
||||
next_external_id uuid,
|
||||
next_inserted_at timestamptz
|
||||
) RETURNS TABLE (
|
||||
tenant_id UUID,
|
||||
external_id UUID,
|
||||
location v1_payload_location_olap,
|
||||
external_location_key TEXT,
|
||||
inline_content JSONB,
|
||||
inserted_at TIMESTAMPTZ,
|
||||
updated_at TIMESTAMPTZ
|
||||
)
|
||||
LANGUAGE plpgsql AS
|
||||
$$
|
||||
DECLARE
|
||||
partition_date_str varchar;
|
||||
source_partition_name varchar;
|
||||
query text;
|
||||
BEGIN
|
||||
IF partition_date IS NULL THEN
|
||||
RAISE EXCEPTION 'partition_date parameter cannot be NULL';
|
||||
END IF;
|
||||
|
||||
SELECT to_char(partition_date, 'YYYYMMDD') INTO partition_date_str;
|
||||
SELECT format('v1_payloads_olap_%s', partition_date_str) INTO source_partition_name;
|
||||
|
||||
IF NOT EXISTS (SELECT 1 FROM pg_tables WHERE tablename = source_partition_name) THEN
|
||||
RAISE EXCEPTION 'Partition % does not exist', source_partition_name;
|
||||
END IF;
|
||||
|
||||
query := format('
|
||||
SELECT tenant_id, external_id, location, external_location_key, inline_content, inserted_at, updated_at
|
||||
FROM %I
|
||||
WHERE
|
||||
(tenant_id, external_id, inserted_at) >= ($1, $2, $3)
|
||||
AND (tenant_id, external_id, inserted_at) <= ($4, $5, $6)
|
||||
ORDER BY tenant_id, external_id, inserted_at
|
||||
', source_partition_name);
|
||||
|
||||
RETURN QUERY EXECUTE query USING last_tenant_id, last_external_id, last_inserted_at, next_tenant_id, next_external_id, next_inserted_at;
|
||||
END;
|
||||
$$;
|
||||
-- +goose StatementEnd
|
||||
@@ -2756,6 +2756,7 @@ func (p *OLAPRepositoryImpl) processOLAPPayloadCutoverBatch(ctx context.Context,
|
||||
Nexttenantid: pr.UpperTenantID,
|
||||
Nextexternalid: pr.UpperExternalID,
|
||||
Nextinsertedat: pr.UpperInsertedAt,
|
||||
Batchsize: externalCutoverBatchSize,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
|
||||
@@ -489,6 +489,7 @@ func (p *payloadStoreRepositoryImpl) ProcessPayloadCutoverBatch(ctx context.Cont
|
||||
Nextinsertedat: pr.UpperInsertedAt,
|
||||
Nextid: pr.UpperID,
|
||||
Nexttype: pr.UpperType,
|
||||
Batchsize: p.externalCutoverBatchSize,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
|
||||
@@ -1856,7 +1856,8 @@ WITH payloads AS (
|
||||
@lastInsertedAt::TIMESTAMPTZ,
|
||||
@nextTenantId::UUID,
|
||||
@nextExternalId::UUID,
|
||||
@nextInsertedAt::TIMESTAMPTZ
|
||||
@nextInsertedAt::TIMESTAMPTZ,
|
||||
@batchSize::INTEGER
|
||||
) p
|
||||
)
|
||||
SELECT
|
||||
|
||||
@@ -1335,7 +1335,8 @@ WITH payloads AS (
|
||||
$4::TIMESTAMPTZ,
|
||||
$5::UUID,
|
||||
$6::UUID,
|
||||
$7::TIMESTAMPTZ
|
||||
$7::TIMESTAMPTZ,
|
||||
$8::INTEGER
|
||||
) p
|
||||
)
|
||||
SELECT
|
||||
@@ -1357,6 +1358,7 @@ type ListPaginatedOLAPPayloadsForOffloadParams struct {
|
||||
Nexttenantid pgtype.UUID `json:"nexttenantid"`
|
||||
Nextexternalid pgtype.UUID `json:"nextexternalid"`
|
||||
Nextinsertedat pgtype.Timestamptz `json:"nextinsertedat"`
|
||||
Batchsize int32 `json:"batchsize"`
|
||||
}
|
||||
|
||||
type ListPaginatedOLAPPayloadsForOffloadRow struct {
|
||||
@@ -1378,6 +1380,7 @@ func (q *Queries) ListPaginatedOLAPPayloadsForOffload(ctx context.Context, db DB
|
||||
arg.Nexttenantid,
|
||||
arg.Nextexternalid,
|
||||
arg.Nextinsertedat,
|
||||
arg.Batchsize,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
||||
@@ -229,7 +229,8 @@ WITH payloads AS (
|
||||
@nextTenantId::UUID,
|
||||
@nextInsertedAt::TIMESTAMPTZ,
|
||||
@nextId::BIGINT,
|
||||
@nextType::v1_payload_type
|
||||
@nextType::v1_payload_type,
|
||||
@batchSize::INTEGER
|
||||
) p
|
||||
)
|
||||
SELECT
|
||||
|
||||
@@ -268,7 +268,8 @@ WITH payloads AS (
|
||||
$6::UUID,
|
||||
$7::TIMESTAMPTZ,
|
||||
$8::BIGINT,
|
||||
$9::v1_payload_type
|
||||
$9::v1_payload_type,
|
||||
$10::INTEGER
|
||||
) p
|
||||
)
|
||||
SELECT
|
||||
@@ -294,6 +295,7 @@ type ListPaginatedPayloadsForOffloadParams struct {
|
||||
Nextinsertedat pgtype.Timestamptz `json:"nextinsertedat"`
|
||||
Nextid int64 `json:"nextid"`
|
||||
Nexttype V1PayloadType `json:"nexttype"`
|
||||
Batchsize int32 `json:"batchsize"`
|
||||
}
|
||||
|
||||
type ListPaginatedPayloadsForOffloadRow struct {
|
||||
@@ -319,6 +321,7 @@ func (q *Queries) ListPaginatedPayloadsForOffload(ctx context.Context, db DBTX,
|
||||
arg.Nextinsertedat,
|
||||
arg.Nextid,
|
||||
arg.Nexttype,
|
||||
arg.Batchsize,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
||||
@@ -1885,7 +1885,8 @@ CREATE OR REPLACE FUNCTION list_paginated_payloads_for_offload(
|
||||
next_tenant_id uuid,
|
||||
next_inserted_at timestamptz,
|
||||
next_id bigint,
|
||||
next_type v1_payload_type
|
||||
next_type v1_payload_type,
|
||||
batch_size integer
|
||||
) RETURNS TABLE (
|
||||
tenant_id UUID,
|
||||
id BIGINT,
|
||||
@@ -1916,16 +1917,32 @@ BEGIN
|
||||
END IF;
|
||||
|
||||
query := format('
|
||||
WITH candidates AS MATERIALIZED (
|
||||
SELECT tenant_id, id, inserted_at, external_id, type, location,
|
||||
external_location_key, inline_content, updated_at
|
||||
FROM %I
|
||||
WHERE
|
||||
(tenant_id, inserted_at, id, type) >= ($1, $2, $3, $4)
|
||||
ORDER BY tenant_id, inserted_at, id, type
|
||||
|
||||
-- Multiplying by two here to handle an edge case. There is a small chance we miss a row
|
||||
-- when a different row is inserted before it, in between us creating the chunks and selecting
|
||||
-- them. By multiplying by two to create a "candidate" set, we significantly reduce the chance of us missing
|
||||
-- rows in this way, since if a row is inserted before one of our last rows, we will still have
|
||||
-- the next row after it in the candidate set.
|
||||
LIMIT $9 * 2
|
||||
)
|
||||
|
||||
SELECT tenant_id, id, inserted_at, external_id, type, location,
|
||||
external_location_key, inline_content, updated_at
|
||||
FROM %I
|
||||
FROM candidates
|
||||
WHERE
|
||||
(tenant_id, inserted_at, id, type) >= ($1, $2, $3, $4)
|
||||
AND (tenant_id, inserted_at, id, type) <= ($5, $6, $7, $8)
|
||||
ORDER BY tenant_id, inserted_at, id, type
|
||||
', source_partition_name);
|
||||
|
||||
RETURN QUERY EXECUTE query USING last_tenant_id, last_inserted_at, last_id, last_type, next_tenant_id, next_inserted_at, next_id, next_type;
|
||||
RETURN QUERY EXECUTE query USING last_tenant_id, last_inserted_at, last_id, last_type, next_tenant_id, next_inserted_at, next_id, next_type, batch_size;
|
||||
END;
|
||||
$$;
|
||||
|
||||
|
||||
@@ -925,7 +925,8 @@ CREATE OR REPLACE FUNCTION list_paginated_olap_payloads_for_offload(
|
||||
last_inserted_at timestamptz,
|
||||
next_tenant_id uuid,
|
||||
next_external_id uuid,
|
||||
next_inserted_at timestamptz
|
||||
next_inserted_at timestamptz,
|
||||
batch_size integer
|
||||
) RETURNS TABLE (
|
||||
tenant_id UUID,
|
||||
external_id UUID,
|
||||
@@ -954,15 +955,30 @@ BEGIN
|
||||
END IF;
|
||||
|
||||
query := format('
|
||||
WITH candidates AS MATERIALIZED (
|
||||
SELECT tenant_id, external_id, location, external_location_key, inline_content, inserted_at, updated_at
|
||||
FROM %I
|
||||
WHERE
|
||||
(tenant_id, external_id, inserted_at) >= ($1, $2, $3)
|
||||
ORDER BY tenant_id, external_id, inserted_at
|
||||
|
||||
-- Multiplying by two here to handle an edge case. There is a small chance we miss a row
|
||||
-- when a different row is inserted before it, in between us creating the chunks and selecting
|
||||
-- them. By multiplying by two to create a "candidate" set, we significantly reduce the chance of us missing
|
||||
-- rows in this way, since if a row is inserted before one of our last rows, we will still have
|
||||
-- the next row after it in the candidate set.
|
||||
LIMIT $7 * 2
|
||||
)
|
||||
|
||||
SELECT tenant_id, external_id, location, external_location_key, inline_content, inserted_at, updated_at
|
||||
FROM %I
|
||||
FROM candidates
|
||||
WHERE
|
||||
(tenant_id, external_id, inserted_at) >= ($1, $2, $3)
|
||||
AND (tenant_id, external_id, inserted_at) <= ($4, $5, $6)
|
||||
ORDER BY tenant_id, external_id, inserted_at
|
||||
', source_partition_name);
|
||||
|
||||
RETURN QUERY EXECUTE query USING last_tenant_id, last_external_id, last_inserted_at, next_tenant_id, next_external_id, next_inserted_at;
|
||||
RETURN QUERY EXECUTE query USING last_tenant_id, last_external_id, last_inserted_at, next_tenant_id, next_external_id, next_inserted_at, batch_size;
|
||||
END;
|
||||
$$;
|
||||
|
||||
|
||||
Reference in New Issue
Block a user