Fix: Don't reset offset if a new process acquires lease (#2628)

* fix: don't reset offset if a new process acquires the lease

* fix: copy paste

* feat: migration, fix queries

* fix: more queries

* fix: down migration

* fix: comment

* feat: finish wiring up everything else

* fix: placeholder initial type

* fix: zero values everywhere

* fix: param ordering

* fix: handle no rows

* fix: zero values

* fix: limit

* fix: simplify

* fix: better defaults
This commit is contained in:
matt
2025-12-09 19:01:51 -05:00
committed by GitHub
parent 65bd347e94
commit 3ff672ebe4
12 changed files with 577 additions and 118 deletions

View File

@@ -0,0 +1,222 @@
-- +goose Up
-- +goose StatementBegin
ALTER TABLE v1_payload_cutover_job_offset
ADD COLUMN last_tenant_id UUID NOT NULL DEFAULT '00000000-0000-0000-0000-000000000000'::UUID,
ADD COLUMN last_inserted_at TIMESTAMPTZ NOT NULL DEFAULT '1970-01-01 00:00:00+00',
ADD COLUMN last_id BIGINT NOT NULL DEFAULT 0,
ADD COLUMN last_type v1_payload_type NOT NULL DEFAULT 'TASK_INPUT',
DROP COLUMN last_offset;
ALTER TABLE v1_payloads_olap_cutover_job_offset
ADD COLUMN last_tenant_id UUID NOT NULL DEFAULT '00000000-0000-0000-0000-000000000000'::UUID,
ADD COLUMN last_external_id UUID NOT NULL DEFAULT '00000000-0000-0000-0000-000000000000'::UUID,
ADD COLUMN last_inserted_at TIMESTAMPTZ NOT NULL DEFAULT '1970-01-01 00:00:00+00',
DROP COLUMN last_offset;
-- need to explicitly drop and replace because of the changes to the params
DROP FUNCTION IF EXISTS list_paginated_payloads_for_offload(date, int, bigint);
DROP FUNCTION IF EXISTS list_paginated_olap_payloads_for_offload(date, int, bigint);
CREATE OR REPLACE FUNCTION list_paginated_payloads_for_offload(
partition_date date,
limit_param int,
last_tenant_id uuid,
last_inserted_at timestamptz,
last_id bigint,
last_type v1_payload_type
) RETURNS TABLE (
tenant_id UUID,
id BIGINT,
inserted_at TIMESTAMPTZ,
external_id UUID,
type v1_payload_type,
location v1_payload_location,
external_location_key TEXT,
inline_content JSONB,
updated_at TIMESTAMPTZ
)
LANGUAGE plpgsql AS
$$
DECLARE
partition_date_str varchar;
source_partition_name varchar;
query text;
BEGIN
IF partition_date IS NULL THEN
RAISE EXCEPTION 'partition_date parameter cannot be NULL';
END IF;
SELECT to_char(partition_date, 'YYYYMMDD') INTO partition_date_str;
SELECT format('v1_payload_%s', partition_date_str) INTO source_partition_name;
IF NOT EXISTS (SELECT 1 FROM pg_tables WHERE tablename = source_partition_name) THEN
RAISE EXCEPTION 'Partition % does not exist', source_partition_name;
END IF;
query := format('
SELECT tenant_id, id, inserted_at, external_id, type, location,
external_location_key, inline_content, updated_at
FROM %I
WHERE (tenant_id, inserted_at, id, type) > ($1, $2, $3, $4)
ORDER BY tenant_id, inserted_at, id, type
LIMIT $5
', source_partition_name);
RETURN QUERY EXECUTE query USING last_tenant_id, last_inserted_at, last_id, last_type, limit_param;
END;
$$;
CREATE OR REPLACE FUNCTION list_paginated_olap_payloads_for_offload(
partition_date date,
limit_param int,
last_tenant_id uuid,
last_external_id uuid,
last_inserted_at timestamptz
) RETURNS TABLE (
tenant_id UUID,
external_id UUID,
location v1_payload_location_olap,
external_location_key TEXT,
inline_content JSONB,
inserted_at TIMESTAMPTZ,
updated_at TIMESTAMPTZ
)
LANGUAGE plpgsql AS
$$
DECLARE
partition_date_str varchar;
source_partition_name varchar;
query text;
BEGIN
IF partition_date IS NULL THEN
RAISE EXCEPTION 'partition_date parameter cannot be NULL';
END IF;
SELECT to_char(partition_date, 'YYYYMMDD') INTO partition_date_str;
SELECT format('v1_payloads_olap_%s', partition_date_str) INTO source_partition_name;
IF NOT EXISTS (SELECT 1 FROM pg_tables WHERE tablename = source_partition_name) THEN
RAISE EXCEPTION 'Partition % does not exist', source_partition_name;
END IF;
query := format('
SELECT tenant_id, external_id, location, external_location_key, inline_content, inserted_at, updated_at
FROM %I
WHERE (tenant_id, external_id, inserted_at) > ($1, $2, $3)
ORDER BY tenant_id, external_id, inserted_at
LIMIT $4
', source_partition_name);
RETURN QUERY EXECUTE query USING last_tenant_id, last_external_id, last_inserted_at, limit_param;
END;
$$;
-- +goose StatementEnd
-- +goose Down
-- +goose StatementBegin
ALTER TABLE v1_payload_cutover_job_offset
ADD COLUMN last_offset BIGINT NOT NULL DEFAULT 0,
DROP COLUMN last_tenant_id,
DROP COLUMN last_inserted_at,
DROP COLUMN last_id,
DROP COLUMN last_type;
ALTER TABLE v1_payloads_olap_cutover_job_offset
ADD COLUMN last_offset BIGINT NOT NULL DEFAULT 0,
DROP COLUMN last_tenant_id,
DROP COLUMN last_external_id,
DROP COLUMN last_inserted_at
;
DROP FUNCTION IF EXISTS list_paginated_payloads_for_offload(date, int, uuid, timestamptz, bigint, v1_payload_type);
DROP FUNCTION IF EXISTS list_paginated_olap_payloads_for_offload(date, int, uuid, uuid, timestamptz);
CREATE OR REPLACE FUNCTION list_paginated_olap_payloads_for_offload(
partition_date date,
limit_param int,
offset_param bigint
) RETURNS TABLE (
tenant_id UUID,
external_id UUID,
location v1_payload_location_olap,
external_location_key TEXT,
inline_content JSONB,
inserted_at TIMESTAMPTZ,
updated_at TIMESTAMPTZ
)
LANGUAGE plpgsql AS
$$
DECLARE
partition_date_str varchar;
source_partition_name varchar;
query text;
BEGIN
IF partition_date IS NULL THEN
RAISE EXCEPTION 'partition_date parameter cannot be NULL';
END IF;
SELECT to_char(partition_date, 'YYYYMMDD') INTO partition_date_str;
SELECT format('v1_payloads_olap_%s', partition_date_str) INTO source_partition_name;
IF NOT EXISTS (SELECT 1 FROM pg_tables WHERE tablename = source_partition_name) THEN
RAISE EXCEPTION 'Partition % does not exist', source_partition_name;
END IF;
query := format('
SELECT tenant_id, external_id, location, external_location_key, inline_content, inserted_at, updated_at
FROM %I
ORDER BY tenant_id, external_id, inserted_at
LIMIT $1
OFFSET $2
', source_partition_name);
RETURN QUERY EXECUTE query USING limit_param, offset_param;
END;
$$;
CREATE OR REPLACE FUNCTION list_paginated_payloads_for_offload(
partition_date date,
limit_param int,
offset_param bigint
) RETURNS TABLE (
tenant_id UUID,
id BIGINT,
inserted_at TIMESTAMPTZ,
external_id UUID,
type v1_payload_type,
location v1_payload_location,
external_location_key TEXT,
inline_content JSONB,
updated_at TIMESTAMPTZ
)
LANGUAGE plpgsql AS
$$
DECLARE
partition_date_str varchar;
source_partition_name varchar;
query text;
BEGIN
IF partition_date IS NULL THEN
RAISE EXCEPTION 'partition_date parameter cannot be NULL';
END IF;
SELECT to_char(partition_date, 'YYYYMMDD') INTO partition_date_str;
SELECT format('v1_payload_%s', partition_date_str) INTO source_partition_name;
IF NOT EXISTS (SELECT 1 FROM pg_tables WHERE tablename = source_partition_name) THEN
RAISE EXCEPTION 'Partition % does not exist', source_partition_name;
END IF;
query := format('
SELECT tenant_id, id, inserted_at, external_id, type, location,
external_location_key, inline_content, updated_at
FROM %I
ORDER BY tenant_id, inserted_at, id, type
LIMIT $1
OFFSET $2
', source_partition_name);
RETURN QUERY EXECUTE query USING limit_param, offset_param;
END;
$$;
-- +goose StatementEnd

View File

@@ -2684,7 +2684,26 @@ type BulkCutOverOLAPPayload struct {
ExternalLocationKey ExternalPayloadLocationKey
}
func (p *OLAPRepositoryImpl) processOLAPPayloadCutoverBatch(ctx context.Context, processId pgtype.UUID, partitionDate PartitionDate, offset int64, externalCutoverBatchSize int32) (*CutoverBatchOutcome, error) {
type OLAPPaginationParams struct {
LastTenantId pgtype.UUID
LastInsertedAt pgtype.Timestamptz
LastExternalId pgtype.UUID
Limit int32
}
type OLAPCutoverJobRunMetadata struct {
ShouldRun bool
Pagination OLAPPaginationParams
PartitionDate PartitionDate
LeaseProcessId pgtype.UUID
}
type OLAPCutoverBatchOutcome struct {
ShouldContinue bool
NextPagination OLAPPaginationParams
}
func (p *OLAPRepositoryImpl) processOLAPPayloadCutoverBatch(ctx context.Context, processId pgtype.UUID, partitionDate PartitionDate, pagination OLAPPaginationParams) (*OLAPCutoverBatchOutcome, error) {
tx, commit, rollback, err := sqlchelpers.PrepareTx(ctx, p.pool, p.l, 10000)
if err != nil {
@@ -2694,10 +2713,13 @@ func (p *OLAPRepositoryImpl) processOLAPPayloadCutoverBatch(ctx context.Context,
defer rollback()
tableName := fmt.Sprintf("v1_payloads_olap_offload_tmp_%s", partitionDate.String())
payloads, err := p.queries.ListPaginatedOLAPPayloadsForOffload(ctx, tx, sqlcv1.ListPaginatedOLAPPayloadsForOffloadParams{
Partitiondate: pgtype.Date(partitionDate),
Offsetparam: offset,
Limitparam: externalCutoverBatchSize,
Partitiondate: pgtype.Date(partitionDate),
Lasttenantid: pagination.LastTenantId,
Lastexternalid: pagination.LastExternalId,
Lastinsertedat: pagination.LastInsertedAt,
Limitparam: pagination.Limit,
})
if err != nil {
@@ -2747,15 +2769,27 @@ func (p *OLAPRepositoryImpl) processOLAPPayloadCutoverBatch(ctx context.Context,
})
}
_, err = sqlcv1.InsertCutOverOLAPPayloadsIntoTempTable(ctx, tx, tableName, payloadsToInsert)
insertResult, err := sqlcv1.InsertCutOverOLAPPayloadsIntoTempTable(ctx, tx, tableName, payloadsToInsert)
if err != nil {
if err != nil && !errors.Is(err, pgx.ErrNoRows) {
return nil, fmt.Errorf("failed to copy offloaded payloads into temp table: %w", err)
}
offset += int64(len(payloads))
isNoRows := errors.Is(err, pgx.ErrNoRows)
_, err = p.acquireOrExtendJobLease(ctx, tx, processId, partitionDate, offset)
params := OLAPPaginationParams{
LastTenantId: insertResult.TenantId,
LastInsertedAt: insertResult.InsertedAt,
LastExternalId: insertResult.ExternalId,
Limit: pagination.Limit,
}
// hack so that we don't have errors from zero values when no rows are returned
if isNoRows {
params = pagination
}
extendedLease, err := p.acquireOrExtendJobLease(ctx, tx, processId, partitionDate, params)
if err != nil {
return nil, fmt.Errorf("failed to extend cutover job lease: %w", err)
@@ -2765,26 +2799,28 @@ func (p *OLAPRepositoryImpl) processOLAPPayloadCutoverBatch(ctx context.Context,
return nil, fmt.Errorf("failed to commit copy offloaded payloads transaction: %w", err)
}
if len(payloads) < int(externalCutoverBatchSize) {
return &CutoverBatchOutcome{
if len(payloads) < int(pagination.Limit) || isNoRows {
return &OLAPCutoverBatchOutcome{
ShouldContinue: false,
NextOffset: offset,
NextPagination: extendedLease.Pagination,
}, nil
}
return &CutoverBatchOutcome{
return &OLAPCutoverBatchOutcome{
ShouldContinue: true,
NextOffset: offset,
NextPagination: extendedLease.Pagination,
}, nil
}
func (p *OLAPRepositoryImpl) acquireOrExtendJobLease(ctx context.Context, tx pgx.Tx, processId pgtype.UUID, partitionDate PartitionDate, offset int64) (*CutoverJobRunMetadata, error) {
func (p *OLAPRepositoryImpl) acquireOrExtendJobLease(ctx context.Context, tx pgx.Tx, processId pgtype.UUID, partitionDate PartitionDate, pagination OLAPPaginationParams) (*OLAPCutoverJobRunMetadata, error) {
leaseInterval := 2 * time.Minute
leaseExpiresAt := sqlchelpers.TimestamptzFromTime(time.Now().Add(leaseInterval))
lease, err := p.queries.AcquireOrExtendOLAPCutoverJobLease(ctx, tx, sqlcv1.AcquireOrExtendOLAPCutoverJobLeaseParams{
Key: pgtype.Date(partitionDate),
Lastoffset: offset,
Lasttenantid: pagination.LastTenantId,
Lastexternalid: pagination.LastExternalId,
Lastinsertedat: pagination.LastInsertedAt,
Leaseprocessid: processId,
Leaseexpiresat: leaseExpiresAt,
})
@@ -2793,9 +2829,8 @@ func (p *OLAPRepositoryImpl) acquireOrExtendJobLease(ctx context.Context, tx pgx
// ErrNoRows here means that something else is holding the lease
// since we did not insert a new record, and the `UPDATE` returned an empty set
if errors.Is(err, pgx.ErrNoRows) {
return &CutoverJobRunMetadata{
return &OLAPCutoverJobRunMetadata{
ShouldRun: false,
LastOffset: 0,
PartitionDate: partitionDate,
LeaseProcessId: processId,
}, nil
@@ -2804,23 +2839,33 @@ func (p *OLAPRepositoryImpl) acquireOrExtendJobLease(ctx context.Context, tx pgx
}
if lease.LeaseProcessID != processId || lease.IsCompleted {
return &CutoverJobRunMetadata{
ShouldRun: false,
LastOffset: lease.LastOffset,
return &OLAPCutoverJobRunMetadata{
ShouldRun: false,
Pagination: OLAPPaginationParams{
LastTenantId: lease.LastTenantID,
LastInsertedAt: lease.LastInsertedAt,
LastExternalId: lease.LastExternalID,
Limit: pagination.Limit,
},
PartitionDate: partitionDate,
LeaseProcessId: lease.LeaseProcessID,
}, nil
}
return &CutoverJobRunMetadata{
ShouldRun: true,
LastOffset: lease.LastOffset,
return &OLAPCutoverJobRunMetadata{
ShouldRun: true,
Pagination: OLAPPaginationParams{
LastTenantId: lease.LastTenantID,
LastInsertedAt: lease.LastInsertedAt,
LastExternalId: lease.LastExternalID,
Limit: pagination.Limit,
},
PartitionDate: partitionDate,
LeaseProcessId: processId,
}, nil
}
func (p *OLAPRepositoryImpl) prepareCutoverTableJob(ctx context.Context, processId pgtype.UUID, partitionDate PartitionDate, inlineStoreTTL *time.Duration) (*CutoverJobRunMetadata, error) {
func (p *OLAPRepositoryImpl) prepareCutoverTableJob(ctx context.Context, processId pgtype.UUID, partitionDate PartitionDate, inlineStoreTTL *time.Duration, externalCutoverBatchSize int32) (*OLAPCutoverJobRunMetadata, error) {
if inlineStoreTTL == nil {
return nil, fmt.Errorf("inline store TTL is not set")
}
@@ -2833,7 +2878,14 @@ func (p *OLAPRepositoryImpl) prepareCutoverTableJob(ctx context.Context, process
defer rollback()
lease, err := p.acquireOrExtendJobLease(ctx, tx, processId, partitionDate, 0)
var zeroUuid uuid.UUID
lease, err := p.acquireOrExtendJobLease(ctx, tx, processId, partitionDate, OLAPPaginationParams{
LastTenantId: sqlchelpers.UUIDFromStr(zeroUuid.String()),
LastExternalId: sqlchelpers.UUIDFromStr(zeroUuid.String()),
LastInsertedAt: sqlchelpers.TimestamptzFromTime(time.Unix(0, 0)),
Limit: externalCutoverBatchSize,
})
if err != nil {
return nil, fmt.Errorf("failed to acquire or extend cutover job lease: %w", err)
@@ -2843,8 +2895,6 @@ func (p *OLAPRepositoryImpl) prepareCutoverTableJob(ctx context.Context, process
return lease, nil
}
offset := lease.LastOffset
err = p.queries.CreateV1PayloadOLAPCutoverTemporaryTable(ctx, tx, pgtype.Date(partitionDate))
if err != nil {
@@ -2855,9 +2905,9 @@ func (p *OLAPRepositoryImpl) prepareCutoverTableJob(ctx context.Context, process
return nil, fmt.Errorf("failed to commit copy offloaded payloads transaction: %w", err)
}
return &CutoverJobRunMetadata{
return &OLAPCutoverJobRunMetadata{
ShouldRun: true,
LastOffset: offset,
Pagination: lease.Pagination,
PartitionDate: partitionDate,
LeaseProcessId: processId,
}, nil
@@ -2867,7 +2917,7 @@ func (p *OLAPRepositoryImpl) processSinglePartition(ctx context.Context, process
ctx, span := telemetry.NewSpan(ctx, "olap_repository.processSinglePartition")
defer span.End()
jobMeta, err := p.prepareCutoverTableJob(ctx, processId, partitionDate, inlineStoreTTL)
jobMeta, err := p.prepareCutoverTableJob(ctx, processId, partitionDate, inlineStoreTTL, externalCutoverBatchSize)
if err != nil {
return fmt.Errorf("failed to prepare cutover table job: %w", err)
@@ -2877,10 +2927,10 @@ func (p *OLAPRepositoryImpl) processSinglePartition(ctx context.Context, process
return nil
}
offset := jobMeta.LastOffset
pagination := jobMeta.Pagination
for {
outcome, err := p.processOLAPPayloadCutoverBatch(ctx, processId, partitionDate, offset, externalCutoverBatchSize)
outcome, err := p.processOLAPPayloadCutoverBatch(ctx, processId, partitionDate, pagination)
if err != nil {
return fmt.Errorf("failed to process payload cutover batch: %w", err)
@@ -2890,7 +2940,7 @@ func (p *OLAPRepositoryImpl) processSinglePartition(ctx context.Context, process
break
}
offset = outcome.NextOffset
pagination = outcome.NextPagination
}
tempPartitionName := fmt.Sprintf("v1_payloads_olap_offload_tmp_%s", partitionDate.String())

View File

@@ -402,9 +402,16 @@ type BulkCutOverPayload struct {
ExternalLocationKey ExternalPayloadLocationKey
}
type PaginationParams struct {
LastTenantID pgtype.UUID
LastInsertedAt pgtype.Timestamptz
LastID int64
LastType sqlcv1.V1PayloadType
}
type CutoverBatchOutcome struct {
ShouldContinue bool
NextOffset int64
NextPagination PaginationParams
}
type PartitionDate pgtype.Date
@@ -415,7 +422,7 @@ func (d PartitionDate) String() string {
const MAX_PARTITIONS_TO_OFFLOAD = 14 // two weeks
func (p *payloadStoreRepositoryImpl) ProcessPayloadCutoverBatch(ctx context.Context, processId pgtype.UUID, partitionDate PartitionDate, offset int64) (*CutoverBatchOutcome, error) {
func (p *payloadStoreRepositoryImpl) ProcessPayloadCutoverBatch(ctx context.Context, processId pgtype.UUID, partitionDate PartitionDate, pagination PaginationParams) (*CutoverBatchOutcome, error) {
tx, commit, rollback, err := sqlchelpers.PrepareTx(ctx, p.pool, p.l, 10000)
if err != nil {
@@ -426,9 +433,12 @@ func (p *payloadStoreRepositoryImpl) ProcessPayloadCutoverBatch(ctx context.Cont
tableName := fmt.Sprintf("v1_payload_offload_tmp_%s", partitionDate.String())
payloads, err := p.queries.ListPaginatedPayloadsForOffload(ctx, tx, sqlcv1.ListPaginatedPayloadsForOffloadParams{
Partitiondate: pgtype.Date(partitionDate),
Offsetparam: offset,
Limitparam: p.externalCutoverBatchSize,
Partitiondate: pgtype.Date(partitionDate),
Limitparam: p.externalCutoverBatchSize,
Lasttenantid: pagination.LastTenantID,
Lastinsertedat: pagination.LastInsertedAt,
Lastid: pagination.LastID,
Lasttype: pagination.LastType,
})
if err != nil {
@@ -483,15 +493,27 @@ func (p *payloadStoreRepositoryImpl) ProcessPayloadCutoverBatch(ctx context.Cont
})
}
_, err = sqlcv1.InsertCutOverPayloadsIntoTempTable(ctx, tx, tableName, payloadsToInsert)
inserted, err := sqlcv1.InsertCutOverPayloadsIntoTempTable(ctx, tx, tableName, payloadsToInsert)
if err != nil {
if err != nil && !errors.Is(err, pgx.ErrNoRows) {
return nil, fmt.Errorf("failed to copy offloaded payloads into temp table: %w", err)
}
offset += int64(len(payloads))
isNoRows := errors.Is(err, pgx.ErrNoRows)
_, err = p.acquireOrExtendJobLease(ctx, tx, processId, partitionDate, offset)
params := PaginationParams{
LastTenantID: inserted.TenantId,
LastInsertedAt: inserted.InsertedAt,
LastID: inserted.ID,
LastType: inserted.Type,
}
// hack so that we don't have errors from zero values when no rows are returned
if isNoRows {
params = pagination
}
extendedLease, err := p.acquireOrExtendJobLease(ctx, tx, processId, partitionDate, params)
if err != nil {
return nil, fmt.Errorf("failed to extend cutover job lease: %w", err)
@@ -501,35 +523,38 @@ func (p *payloadStoreRepositoryImpl) ProcessPayloadCutoverBatch(ctx context.Cont
return nil, fmt.Errorf("failed to commit copy offloaded payloads transaction: %w", err)
}
if len(payloads) < int(p.externalCutoverBatchSize) {
if len(payloads) < int(p.externalCutoverBatchSize) || isNoRows {
return &CutoverBatchOutcome{
ShouldContinue: false,
NextOffset: offset,
NextPagination: extendedLease.Pagination,
}, nil
}
return &CutoverBatchOutcome{
ShouldContinue: true,
NextOffset: offset,
NextPagination: extendedLease.Pagination,
}, nil
}
type CutoverJobRunMetadata struct {
ShouldRun bool
LastOffset int64
Pagination PaginationParams
PartitionDate PartitionDate
LeaseProcessId pgtype.UUID
}
func (p *payloadStoreRepositoryImpl) acquireOrExtendJobLease(ctx context.Context, tx pgx.Tx, processId pgtype.UUID, partitionDate PartitionDate, offset int64) (*CutoverJobRunMetadata, error) {
func (p *payloadStoreRepositoryImpl) acquireOrExtendJobLease(ctx context.Context, tx pgx.Tx, processId pgtype.UUID, partitionDate PartitionDate, pagination PaginationParams) (*CutoverJobRunMetadata, error) {
leaseInterval := 2 * time.Minute
leaseExpiresAt := sqlchelpers.TimestamptzFromTime(time.Now().Add(leaseInterval))
lease, err := p.queries.AcquireOrExtendCutoverJobLease(ctx, tx, sqlcv1.AcquireOrExtendCutoverJobLeaseParams{
Key: pgtype.Date(partitionDate),
Lastoffset: offset,
Leaseprocessid: processId,
Leaseexpiresat: leaseExpiresAt,
Lasttenantid: pagination.LastTenantID,
Lastinsertedat: pagination.LastInsertedAt,
Lastid: pagination.LastID,
Lasttype: pagination.LastType,
})
if err != nil {
@@ -538,7 +563,6 @@ func (p *payloadStoreRepositoryImpl) acquireOrExtendJobLease(ctx context.Context
if errors.Is(err, pgx.ErrNoRows) {
return &CutoverJobRunMetadata{
ShouldRun: false,
LastOffset: 0,
PartitionDate: partitionDate,
LeaseProcessId: processId,
}, nil
@@ -548,16 +572,26 @@ func (p *payloadStoreRepositoryImpl) acquireOrExtendJobLease(ctx context.Context
if lease.LeaseProcessID != processId || lease.IsCompleted {
return &CutoverJobRunMetadata{
ShouldRun: false,
LastOffset: lease.LastOffset,
ShouldRun: false,
Pagination: PaginationParams{
LastTenantID: lease.LastTenantID,
LastInsertedAt: lease.LastInsertedAt,
LastID: lease.LastID,
LastType: lease.LastType,
},
PartitionDate: partitionDate,
LeaseProcessId: lease.LeaseProcessID,
}, nil
}
return &CutoverJobRunMetadata{
ShouldRun: true,
LastOffset: lease.LastOffset,
ShouldRun: true,
Pagination: PaginationParams{
LastTenantID: lease.LastTenantID,
LastInsertedAt: lease.LastInsertedAt,
LastID: lease.LastID,
LastType: lease.LastType,
},
PartitionDate: partitionDate,
LeaseProcessId: processId,
}, nil
@@ -576,7 +610,15 @@ func (p *payloadStoreRepositoryImpl) prepareCutoverTableJob(ctx context.Context,
defer rollback()
lease, err := p.acquireOrExtendJobLease(ctx, tx, processId, partitionDate, 0)
var zeroUuid uuid.UUID
lease, err := p.acquireOrExtendJobLease(ctx, tx, processId, partitionDate, PaginationParams{
// placeholder initial type
LastType: sqlcv1.V1PayloadTypeDAGINPUT,
LastTenantID: sqlchelpers.UUIDFromStr(zeroUuid.String()),
LastInsertedAt: sqlchelpers.TimestamptzFromTime(time.Unix(0, 0)),
LastID: 0,
})
if err != nil {
return nil, fmt.Errorf("failed to acquire or extend cutover job lease: %w", err)
@@ -586,8 +628,6 @@ func (p *payloadStoreRepositoryImpl) prepareCutoverTableJob(ctx context.Context,
return lease, nil
}
offset := lease.LastOffset
err = p.queries.CreateV1PayloadCutoverTemporaryTable(ctx, tx, pgtype.Date(partitionDate))
if err != nil {
@@ -600,7 +640,7 @@ func (p *payloadStoreRepositoryImpl) prepareCutoverTableJob(ctx context.Context,
return &CutoverJobRunMetadata{
ShouldRun: true,
LastOffset: offset,
Pagination: lease.Pagination,
PartitionDate: partitionDate,
LeaseProcessId: processId,
}, nil
@@ -620,10 +660,10 @@ func (p *payloadStoreRepositoryImpl) processSinglePartition(ctx context.Context,
return nil
}
offset := jobMeta.LastOffset
pagination := jobMeta.Pagination
for {
outcome, err := p.ProcessPayloadCutoverBatch(ctx, processId, partitionDate, offset)
outcome, err := p.ProcessPayloadCutoverBatch(ctx, processId, partitionDate, pagination)
if err != nil {
return fmt.Errorf("failed to process payload cutover batch: %w", err)
@@ -633,7 +673,7 @@ func (p *payloadStoreRepositoryImpl) processSinglePartition(ctx context.Context,
break
}
offset = outcome.NextOffset
pagination = outcome.NextPagination
}
tempPartitionName := fmt.Sprintf("v1_payload_offload_tmp_%s", partitionDate.String())

View File

@@ -3131,10 +3131,13 @@ type V1Payload struct {
type V1PayloadCutoverJobOffset struct {
Key pgtype.Date `json:"key"`
LastOffset int64 `json:"last_offset"`
IsCompleted bool `json:"is_completed"`
LeaseProcessID pgtype.UUID `json:"lease_process_id"`
LeaseExpiresAt pgtype.Timestamptz `json:"lease_expires_at"`
LastTenantID pgtype.UUID `json:"last_tenant_id"`
LastInsertedAt pgtype.Timestamptz `json:"last_inserted_at"`
LastID int64 `json:"last_id"`
LastType V1PayloadType `json:"last_type"`
}
type V1PayloadCutoverQueueItem struct {
@@ -3166,10 +3169,12 @@ type V1PayloadsOlap struct {
type V1PayloadsOlapCutoverJobOffset struct {
Key pgtype.Date `json:"key"`
LastOffset int64 `json:"last_offset"`
IsCompleted bool `json:"is_completed"`
LeaseProcessID pgtype.UUID `json:"lease_process_id"`
LeaseExpiresAt pgtype.Timestamptz `json:"lease_expires_at"`
LastTenantID pgtype.UUID `json:"last_tenant_id"`
LastExternalID pgtype.UUID `json:"last_external_id"`
LastInsertedAt pgtype.Timestamptz `json:"last_inserted_at"`
}
type V1Queue struct {

View File

@@ -454,7 +454,13 @@ type CutoverOLAPPayloadToInsert struct {
ExternalLocationKey string
}
func InsertCutOverOLAPPayloadsIntoTempTable(ctx context.Context, tx DBTX, tableName string, payloads []CutoverOLAPPayloadToInsert) (int64, error) {
type InsertCutOverOLAPPayloadsIntoTempTableRow struct {
TenantId pgtype.UUID
ExternalId pgtype.UUID
InsertedAt pgtype.Timestamptz
}
func InsertCutOverOLAPPayloadsIntoTempTable(ctx context.Context, tx DBTX, tableName string, payloads []CutoverOLAPPayloadToInsert) (*InsertCutOverOLAPPayloadsIntoTempTableRow, error) {
tenantIds := make([]pgtype.UUID, 0, len(payloads))
insertedAts := make([]pgtype.Timestamptz, 0, len(payloads))
externalIds := make([]pgtype.UUID, 0, len(payloads))
@@ -498,8 +504,10 @@ func InsertCutOverOLAPPayloadsIntoTempTable(ctx context.Context, tx DBTX, tableN
RETURNING *
)
SELECT COUNT(*)
SELECT tenant_id, external_id, inserted_at
FROM inserts
ORDER BY tenant_id DESC, external_id DESC, inserted_at DESC
LIMIT 1
`,
tableName,
),
@@ -510,10 +518,10 @@ func InsertCutOverOLAPPayloadsIntoTempTable(ctx context.Context, tx DBTX, tableN
externalLocationKeys,
)
var copyCount int64
err := row.Scan(&copyCount)
var insertRow InsertCutOverOLAPPayloadsIntoTempTableRow
err := row.Scan(&insertRow.TenantId, &insertRow.ExternalId, &insertRow.InsertedAt)
return copyCount, err
return &insertRow, err
}
func CompareOLAPPartitionRowCounts(ctx context.Context, tx DBTX, tempPartitionName, sourcePartitionName string) (bool, error) {

View File

@@ -1852,7 +1852,9 @@ WITH payloads AS (
FROM list_paginated_olap_payloads_for_offload(
@partitionDate::DATE,
@limitParam::INT,
@offsetParam::BIGINT
@lastTenantId::UUID,
@lastExternalId::UUID,
@lastInsertedAt::TIMESTAMPTZ
) p
)
SELECT
@@ -1872,11 +1874,25 @@ SELECT copy_v1_payloads_olap_partition_structure(@date::DATE);
SELECT swap_v1_payloads_olap_partition_with_temp(@date::DATE);
-- name: AcquireOrExtendOLAPCutoverJobLease :one
INSERT INTO v1_payloads_olap_cutover_job_offset (key, last_offset, lease_process_id, lease_expires_at)
VALUES (@key::DATE, @lastOffset::BIGINT, @leaseProcessId::UUID, @leaseExpiresAt::TIMESTAMPTZ)
INSERT INTO v1_payloads_olap_cutover_job_offset (key, lease_process_id, lease_expires_at, last_tenant_id, last_external_id, last_inserted_at)
VALUES (@key::DATE, @leaseProcessId::UUID, @leaseExpiresAt::TIMESTAMPTZ, @lastTenantId::UUID, @lastExternalId::UUID, @lastInsertedAt::TIMESTAMPTZ)
ON CONFLICT (key)
DO UPDATE SET
last_offset = EXCLUDED.last_offset,
-- if the lease is held by this process, then we extend the offset to the new value
-- otherwise it's a new process acquiring the lease, so we should keep the offset where it was before
last_tenant_id = CASE
WHEN EXCLUDED.lease_process_id = v1_payloads_olap_cutover_job_offset.lease_process_id THEN EXCLUDED.last_tenant_id
ELSE v1_payloads_olap_cutover_job_offset.last_tenant_id
END,
last_external_id = CASE
WHEN EXCLUDED.lease_process_id = v1_payloads_olap_cutover_job_offset.lease_process_id THEN EXCLUDED.last_external_id
ELSE v1_payloads_olap_cutover_job_offset.last_external_id
END,
last_inserted_at = CASE
WHEN EXCLUDED.lease_process_id = v1_payloads_olap_cutover_job_offset.lease_process_id THEN EXCLUDED.last_inserted_at
ELSE v1_payloads_olap_cutover_job_offset.last_inserted_at
END,
lease_process_id = EXCLUDED.lease_process_id,
lease_expires_at = EXCLUDED.lease_expires_at
WHERE v1_payloads_olap_cutover_job_offset.lease_expires_at < NOW() OR v1_payloads_olap_cutover_job_offset.lease_process_id = @leaseProcessId::UUID

View File

@@ -12,38 +12,58 @@ import (
)
const acquireOrExtendOLAPCutoverJobLease = `-- name: AcquireOrExtendOLAPCutoverJobLease :one
INSERT INTO v1_payloads_olap_cutover_job_offset (key, last_offset, lease_process_id, lease_expires_at)
VALUES ($1::DATE, $2::BIGINT, $3::UUID, $4::TIMESTAMPTZ)
INSERT INTO v1_payloads_olap_cutover_job_offset (key, lease_process_id, lease_expires_at, last_tenant_id, last_external_id, last_inserted_at)
VALUES ($1::DATE, $2::UUID, $3::TIMESTAMPTZ, $4::UUID, $5::UUID, $6::TIMESTAMPTZ)
ON CONFLICT (key)
DO UPDATE SET
last_offset = EXCLUDED.last_offset,
-- if the lease is held by this process, then we extend the offset to the new value
-- otherwise it's a new process acquiring the lease, so we should keep the offset where it was before
last_tenant_id = CASE
WHEN EXCLUDED.lease_process_id = v1_payloads_olap_cutover_job_offset.lease_process_id THEN EXCLUDED.last_tenant_id
ELSE v1_payloads_olap_cutover_job_offset.last_tenant_id
END,
last_external_id = CASE
WHEN EXCLUDED.lease_process_id = v1_payloads_olap_cutover_job_offset.lease_process_id THEN EXCLUDED.last_external_id
ELSE v1_payloads_olap_cutover_job_offset.last_external_id
END,
last_inserted_at = CASE
WHEN EXCLUDED.lease_process_id = v1_payloads_olap_cutover_job_offset.lease_process_id THEN EXCLUDED.last_inserted_at
ELSE v1_payloads_olap_cutover_job_offset.last_inserted_at
END,
lease_process_id = EXCLUDED.lease_process_id,
lease_expires_at = EXCLUDED.lease_expires_at
WHERE v1_payloads_olap_cutover_job_offset.lease_expires_at < NOW() OR v1_payloads_olap_cutover_job_offset.lease_process_id = $3::UUID
RETURNING key, last_offset, is_completed, lease_process_id, lease_expires_at
WHERE v1_payloads_olap_cutover_job_offset.lease_expires_at < NOW() OR v1_payloads_olap_cutover_job_offset.lease_process_id = $2::UUID
RETURNING key, is_completed, lease_process_id, lease_expires_at, last_tenant_id, last_external_id, last_inserted_at
`
type AcquireOrExtendOLAPCutoverJobLeaseParams struct {
Key pgtype.Date `json:"key"`
Lastoffset int64 `json:"lastoffset"`
Leaseprocessid pgtype.UUID `json:"leaseprocessid"`
Leaseexpiresat pgtype.Timestamptz `json:"leaseexpiresat"`
Lasttenantid pgtype.UUID `json:"lasttenantid"`
Lastexternalid pgtype.UUID `json:"lastexternalid"`
Lastinsertedat pgtype.Timestamptz `json:"lastinsertedat"`
}
func (q *Queries) AcquireOrExtendOLAPCutoverJobLease(ctx context.Context, db DBTX, arg AcquireOrExtendOLAPCutoverJobLeaseParams) (*V1PayloadsOlapCutoverJobOffset, error) {
row := db.QueryRow(ctx, acquireOrExtendOLAPCutoverJobLease,
arg.Key,
arg.Lastoffset,
arg.Leaseprocessid,
arg.Leaseexpiresat,
arg.Lasttenantid,
arg.Lastexternalid,
arg.Lastinsertedat,
)
var i V1PayloadsOlapCutoverJobOffset
err := row.Scan(
&i.Key,
&i.LastOffset,
&i.IsCompleted,
&i.LeaseProcessID,
&i.LeaseExpiresAt,
&i.LastTenantID,
&i.LastExternalID,
&i.LastInsertedAt,
)
return &i, err
}
@@ -1207,7 +1227,9 @@ WITH payloads AS (
FROM list_paginated_olap_payloads_for_offload(
$1::DATE,
$2::INT,
$3::BIGINT
$3::UUID,
$4::UUID,
$5::TIMESTAMPTZ
) p
)
SELECT
@@ -1222,9 +1244,11 @@ FROM payloads
`
type ListPaginatedOLAPPayloadsForOffloadParams struct {
Partitiondate pgtype.Date `json:"partitiondate"`
Limitparam int32 `json:"limitparam"`
Offsetparam int64 `json:"offsetparam"`
Partitiondate pgtype.Date `json:"partitiondate"`
Limitparam int32 `json:"limitparam"`
Lasttenantid pgtype.UUID `json:"lasttenantid"`
Lastexternalid pgtype.UUID `json:"lastexternalid"`
Lastinsertedat pgtype.Timestamptz `json:"lastinsertedat"`
}
type ListPaginatedOLAPPayloadsForOffloadRow struct {
@@ -1238,7 +1262,13 @@ type ListPaginatedOLAPPayloadsForOffloadRow struct {
}
func (q *Queries) ListPaginatedOLAPPayloadsForOffload(ctx context.Context, db DBTX, arg ListPaginatedOLAPPayloadsForOffloadParams) ([]*ListPaginatedOLAPPayloadsForOffloadRow, error) {
rows, err := db.Query(ctx, listPaginatedOLAPPayloadsForOffload, arg.Partitiondate, arg.Limitparam, arg.Offsetparam)
rows, err := db.Query(ctx, listPaginatedOLAPPayloadsForOffload,
arg.Partitiondate,
arg.Limitparam,
arg.Lasttenantid,
arg.Lastexternalid,
arg.Lastinsertedat,
)
if err != nil {
return nil, err
}

View File

@@ -16,7 +16,14 @@ type CutoverPayloadToInsert struct {
ExternalLocationKey string
}
func InsertCutOverPayloadsIntoTempTable(ctx context.Context, tx DBTX, tableName string, payloads []CutoverPayloadToInsert) (int64, error) {
type InsertCutOverPayloadsIntoTempTableRow struct {
TenantId pgtype.UUID
ID int64
InsertedAt pgtype.Timestamptz
Type V1PayloadType
}
func InsertCutOverPayloadsIntoTempTable(ctx context.Context, tx DBTX, tableName string, payloads []CutoverPayloadToInsert) (*InsertCutOverPayloadsIntoTempTableRow, error) {
tenantIds := make([]pgtype.UUID, 0, len(payloads))
ids := make([]int64, 0, len(payloads))
insertedAts := make([]pgtype.Timestamptz, 0, len(payloads))
@@ -68,8 +75,10 @@ func InsertCutOverPayloadsIntoTempTable(ctx context.Context, tx DBTX, tableName
RETURNING *
)
SELECT COUNT(*)
SELECT tenant_id, inserted_at, id, type
FROM inserts
ORDER BY tenant_id DESC, inserted_at DESC, id DESC, type DESC
LIMIT 1
`,
tableName,
),
@@ -82,10 +91,16 @@ func InsertCutOverPayloadsIntoTempTable(ctx context.Context, tx DBTX, tableName
externalLocationKeys,
)
var copyCount int64
err := row.Scan(&copyCount)
var insertRow InsertCutOverPayloadsIntoTempTableRow
return copyCount, err
err := row.Scan(
&insertRow.TenantId,
&insertRow.InsertedAt,
&insertRow.ID,
&insertRow.Type,
)
return &insertRow, err
}
func ComparePartitionRowCounts(ctx context.Context, tx DBTX, tempPartitionName, sourcePartitionName string) (bool, error) {

View File

@@ -223,7 +223,10 @@ WITH payloads AS (
FROM list_paginated_payloads_for_offload(
@partitionDate::DATE,
@limitParam::INT,
@offsetParam::BIGINT
@lastTenantId::UUID,
@lastInsertedAt::TIMESTAMPTZ,
@lastId::BIGINT,
@lastType::v1_payload_type
) p
)
SELECT
@@ -245,11 +248,29 @@ SELECT copy_v1_payload_partition_structure(@date::DATE);
SELECT swap_v1_payload_partition_with_temp(@date::DATE);
-- name: AcquireOrExtendCutoverJobLease :one
INSERT INTO v1_payload_cutover_job_offset (key, last_offset, lease_process_id, lease_expires_at)
VALUES (@key::DATE, @lastOffset::BIGINT, @leaseProcessId::UUID, @leaseExpiresAt::TIMESTAMPTZ)
INSERT INTO v1_payload_cutover_job_offset (key, lease_process_id, lease_expires_at, last_tenant_id, last_inserted_at, last_id, last_type)
VALUES (@key::DATE, @leaseProcessId::UUID, @leaseExpiresAt::TIMESTAMPTZ, @lastTenantId::UUID, @lastInsertedAt::TIMESTAMPTZ, @lastId::BIGINT, @lastType::v1_payload_type)
ON CONFLICT (key)
DO UPDATE SET
last_offset = EXCLUDED.last_offset,
-- if the lease is held by this process, then we extend the offset to the new tuple of (last_tenant_id, last_inserted_at, last_id, last_type)
-- otherwise it's a new process acquiring the lease, so we should keep the offset where it was before
last_tenant_id = CASE
WHEN EXCLUDED.lease_process_id = v1_payload_cutover_job_offset.lease_process_id THEN EXCLUDED.last_tenant_id
ELSE v1_payload_cutover_job_offset.last_tenant_id
END,
last_inserted_at = CASE
WHEN EXCLUDED.lease_process_id = v1_payload_cutover_job_offset.lease_process_id THEN EXCLUDED.last_inserted_at
ELSE v1_payload_cutover_job_offset.last_inserted_at
END,
last_id = CASE
WHEN EXCLUDED.lease_process_id = v1_payload_cutover_job_offset.lease_process_id THEN EXCLUDED.last_id
ELSE v1_payload_cutover_job_offset.last_id
END,
last_type = CASE
WHEN EXCLUDED.lease_process_id = v1_payload_cutover_job_offset.lease_process_id THEN EXCLUDED.last_type
ELSE v1_payload_cutover_job_offset.last_type
END,
lease_process_id = EXCLUDED.lease_process_id,
lease_expires_at = EXCLUDED.lease_expires_at
WHERE v1_payload_cutover_job_offset.lease_expires_at < NOW() OR v1_payload_cutover_job_offset.lease_process_id = @leaseProcessId::UUID

View File

@@ -12,38 +12,65 @@ import (
)
const acquireOrExtendCutoverJobLease = `-- name: AcquireOrExtendCutoverJobLease :one
INSERT INTO v1_payload_cutover_job_offset (key, last_offset, lease_process_id, lease_expires_at)
VALUES ($1::DATE, $2::BIGINT, $3::UUID, $4::TIMESTAMPTZ)
INSERT INTO v1_payload_cutover_job_offset (key, lease_process_id, lease_expires_at, last_tenant_id, last_inserted_at, last_id, last_type)
VALUES ($1::DATE, $2::UUID, $3::TIMESTAMPTZ, $4::UUID, $5::TIMESTAMPTZ, $6::BIGINT, $7::v1_payload_type)
ON CONFLICT (key)
DO UPDATE SET
last_offset = EXCLUDED.last_offset,
-- if the lease is held by this process, then we extend the offset to the new tuple of (last_tenant_id, last_inserted_at, last_id, last_type)
-- otherwise it's a new process acquiring the lease, so we should keep the offset where it was before
last_tenant_id = CASE
WHEN EXCLUDED.lease_process_id = v1_payload_cutover_job_offset.lease_process_id THEN EXCLUDED.last_tenant_id
ELSE v1_payload_cutover_job_offset.last_tenant_id
END,
last_inserted_at = CASE
WHEN EXCLUDED.lease_process_id = v1_payload_cutover_job_offset.lease_process_id THEN EXCLUDED.last_inserted_at
ELSE v1_payload_cutover_job_offset.last_inserted_at
END,
last_id = CASE
WHEN EXCLUDED.lease_process_id = v1_payload_cutover_job_offset.lease_process_id THEN EXCLUDED.last_id
ELSE v1_payload_cutover_job_offset.last_id
END,
last_type = CASE
WHEN EXCLUDED.lease_process_id = v1_payload_cutover_job_offset.lease_process_id THEN EXCLUDED.last_type
ELSE v1_payload_cutover_job_offset.last_type
END,
lease_process_id = EXCLUDED.lease_process_id,
lease_expires_at = EXCLUDED.lease_expires_at
WHERE v1_payload_cutover_job_offset.lease_expires_at < NOW() OR v1_payload_cutover_job_offset.lease_process_id = $3::UUID
RETURNING key, last_offset, is_completed, lease_process_id, lease_expires_at
WHERE v1_payload_cutover_job_offset.lease_expires_at < NOW() OR v1_payload_cutover_job_offset.lease_process_id = $2::UUID
RETURNING key, is_completed, lease_process_id, lease_expires_at, last_tenant_id, last_inserted_at, last_id, last_type
`
type AcquireOrExtendCutoverJobLeaseParams struct {
Key pgtype.Date `json:"key"`
Lastoffset int64 `json:"lastoffset"`
Leaseprocessid pgtype.UUID `json:"leaseprocessid"`
Leaseexpiresat pgtype.Timestamptz `json:"leaseexpiresat"`
Lasttenantid pgtype.UUID `json:"lasttenantid"`
Lastinsertedat pgtype.Timestamptz `json:"lastinsertedat"`
Lastid int64 `json:"lastid"`
Lasttype V1PayloadType `json:"lasttype"`
}
func (q *Queries) AcquireOrExtendCutoverJobLease(ctx context.Context, db DBTX, arg AcquireOrExtendCutoverJobLeaseParams) (*V1PayloadCutoverJobOffset, error) {
row := db.QueryRow(ctx, acquireOrExtendCutoverJobLease,
arg.Key,
arg.Lastoffset,
arg.Leaseprocessid,
arg.Leaseexpiresat,
arg.Lasttenantid,
arg.Lastinsertedat,
arg.Lastid,
arg.Lasttype,
)
var i V1PayloadCutoverJobOffset
err := row.Scan(
&i.Key,
&i.LastOffset,
&i.IsCompleted,
&i.LeaseProcessID,
&i.LeaseExpiresAt,
&i.LastTenantID,
&i.LastInsertedAt,
&i.LastID,
&i.LastType,
)
return &i, err
}
@@ -127,7 +154,10 @@ WITH payloads AS (
FROM list_paginated_payloads_for_offload(
$1::DATE,
$2::INT,
$3::BIGINT
$3::UUID,
$4::TIMESTAMPTZ,
$5::BIGINT,
$6::v1_payload_type
) p
)
SELECT
@@ -144,9 +174,12 @@ FROM payloads
`
type ListPaginatedPayloadsForOffloadParams struct {
Partitiondate pgtype.Date `json:"partitiondate"`
Limitparam int32 `json:"limitparam"`
Offsetparam int64 `json:"offsetparam"`
Partitiondate pgtype.Date `json:"partitiondate"`
Limitparam int32 `json:"limitparam"`
Lasttenantid pgtype.UUID `json:"lasttenantid"`
Lastinsertedat pgtype.Timestamptz `json:"lastinsertedat"`
Lastid int64 `json:"lastid"`
Lasttype V1PayloadType `json:"lasttype"`
}
type ListPaginatedPayloadsForOffloadRow struct {
@@ -162,7 +195,14 @@ type ListPaginatedPayloadsForOffloadRow struct {
}
func (q *Queries) ListPaginatedPayloadsForOffload(ctx context.Context, db DBTX, arg ListPaginatedPayloadsForOffloadParams) ([]*ListPaginatedPayloadsForOffloadRow, error) {
rows, err := db.Query(ctx, listPaginatedPayloadsForOffload, arg.Partitiondate, arg.Limitparam, arg.Offsetparam)
rows, err := db.Query(ctx, listPaginatedPayloadsForOffload,
arg.Partitiondate,
arg.Limitparam,
arg.Lasttenantid,
arg.Lastinsertedat,
arg.Lastid,
arg.Lasttype,
)
if err != nil {
return nil, err
}

View File

@@ -1758,10 +1758,14 @@ $$;
CREATE TABLE v1_payload_cutover_job_offset (
key DATE PRIMARY KEY,
last_offset BIGINT NOT NULL,
is_completed BOOLEAN NOT NULL DEFAULT FALSE,
lease_process_id UUID NOT NULL DEFAULT gen_random_uuid(),
lease_expires_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
lease_expires_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
last_tenant_id UUID NOT NULL DEFAULT '00000000-0000-0000-0000-000000000000'::UUID,
last_inserted_at TIMESTAMPTZ NOT NULL DEFAULT '1970-01-01 00:00:00+00',
last_id BIGINT NOT NULL DEFAULT 0,
last_type v1_payload_type NOT NULL DEFAULT 'TASK_INPUT'
);
CREATE OR REPLACE FUNCTION copy_v1_payload_partition_structure(
@@ -1875,7 +1879,10 @@ $$;
CREATE OR REPLACE FUNCTION list_paginated_payloads_for_offload(
partition_date date,
limit_param int,
offset_param bigint
last_tenant_id uuid,
last_inserted_at timestamptz,
last_id bigint,
last_type v1_payload_type
) RETURNS TABLE (
tenant_id UUID,
id BIGINT,
@@ -1909,12 +1916,12 @@ BEGIN
SELECT tenant_id, id, inserted_at, external_id, type, location,
external_location_key, inline_content, updated_at
FROM %I
WHERE (tenant_id, inserted_at, id, type) > ($1, $2, $3, $4)
ORDER BY tenant_id, inserted_at, id, type
LIMIT $1
OFFSET $2
LIMIT $5
', source_partition_name);
RETURN QUERY EXECUTE query USING limit_param, offset_param;
RETURN QUERY EXECUTE query USING last_tenant_id, last_inserted_at, last_id, last_type, limit_param;
END;
$$;

View File

@@ -801,10 +801,13 @@ EXECUTE FUNCTION v1_events_lookup_table_olap_insert_function();
CREATE TABLE v1_payloads_olap_cutover_job_offset (
key DATE PRIMARY KEY,
last_offset BIGINT NOT NULL,
is_completed BOOLEAN NOT NULL DEFAULT FALSE,
lease_process_id UUID NOT NULL DEFAULT gen_random_uuid(),
lease_expires_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
lease_expires_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
last_tenant_id UUID NOT NULL DEFAULT '00000000-0000-0000-0000-000000000000'::UUID,
last_external_id UUID NOT NULL DEFAULT '00000000-0000-0000-0000-000000000000'::UUID,
last_inserted_at TIMESTAMPTZ NOT NULL DEFAULT '1970-01-01 00:00:00+00'
);
CREATE OR REPLACE FUNCTION copy_v1_payloads_olap_partition_structure(
@@ -918,7 +921,9 @@ $$;
CREATE OR REPLACE FUNCTION list_paginated_olap_payloads_for_offload(
partition_date date,
limit_param int,
offset_param bigint
last_tenant_id uuid,
last_external_id uuid,
last_inserted_at timestamptz
) RETURNS TABLE (
tenant_id UUID,
external_id UUID,
@@ -949,12 +954,12 @@ BEGIN
query := format('
SELECT tenant_id, external_id, location, external_location_key, inline_content, inserted_at, updated_at
FROM %I
WHERE (tenant_id, external_id, inserted_at) > ($1, $2, $3)
ORDER BY tenant_id, external_id, inserted_at
LIMIT $1
OFFSET $2
LIMIT $4
', source_partition_name);
RETURN QUERY EXECUTE query USING limit_param, offset_param;
RETURN QUERY EXECUTE query USING last_tenant_id, last_external_id, last_inserted_at, limit_param;
END;
$$;