diff --git a/cmd/hatchet-migrate/migrate/migrations/20251210193031_v1_0_58.sql b/cmd/hatchet-migrate/migrate/migrations/20251210193031_v1_0_58.sql new file mode 100644 index 000000000..f36bbffeb --- /dev/null +++ b/cmd/hatchet-migrate/migrate/migrations/20251210193031_v1_0_58.sql @@ -0,0 +1,193 @@ +-- +goose Up +-- +goose StatementBegin +CREATE OR REPLACE FUNCTION list_paginated_payloads_for_offload( + partition_date date, + limit_param int, + last_tenant_id uuid, + last_inserted_at timestamptz, + last_id bigint, + last_type v1_payload_type +) RETURNS TABLE ( + tenant_id UUID, + id BIGINT, + inserted_at TIMESTAMPTZ, + external_id UUID, + type v1_payload_type, + location v1_payload_location, + external_location_key TEXT, + inline_content JSONB, + updated_at TIMESTAMPTZ +) + LANGUAGE plpgsql AS +$$ +DECLARE + partition_date_str varchar; + source_partition_name varchar; + query text; +BEGIN + IF partition_date IS NULL THEN + RAISE EXCEPTION 'partition_date parameter cannot be NULL'; + END IF; + + SELECT to_char(partition_date, 'YYYYMMDD') INTO partition_date_str; + SELECT format('v1_payload_%s', partition_date_str) INTO source_partition_name; + + IF NOT EXISTS (SELECT 1 FROM pg_tables WHERE tablename = source_partition_name) THEN + RAISE EXCEPTION 'Partition % does not exist', source_partition_name; + END IF; + + query := format(' + SELECT tenant_id, id, inserted_at, external_id, type, location, + external_location_key, inline_content, updated_at + FROM %I + WHERE (tenant_id, inserted_at, id, type) >= ($1, $2, $3, $4) + ORDER BY tenant_id, inserted_at, id, type + LIMIT $5 + ', source_partition_name); + + RETURN QUERY EXECUTE query USING last_tenant_id, last_inserted_at, last_id, last_type, limit_param; +END; +$$; + +CREATE OR REPLACE FUNCTION list_paginated_olap_payloads_for_offload( + partition_date date, + limit_param int, + last_tenant_id uuid, + last_external_id uuid, + last_inserted_at timestamptz +) RETURNS TABLE ( + tenant_id UUID, + external_id UUID, + location v1_payload_location_olap, + external_location_key TEXT, + inline_content JSONB, + inserted_at TIMESTAMPTZ, + updated_at TIMESTAMPTZ +) + LANGUAGE plpgsql AS +$$ +DECLARE + partition_date_str varchar; + source_partition_name varchar; + query text; +BEGIN + IF partition_date IS NULL THEN + RAISE EXCEPTION 'partition_date parameter cannot be NULL'; + END IF; + + SELECT to_char(partition_date, 'YYYYMMDD') INTO partition_date_str; + SELECT format('v1_payloads_olap_%s', partition_date_str) INTO source_partition_name; + + IF NOT EXISTS (SELECT 1 FROM pg_tables WHERE tablename = source_partition_name) THEN + RAISE EXCEPTION 'Partition % does not exist', source_partition_name; + END IF; + + query := format(' + SELECT tenant_id, external_id, location, external_location_key, inline_content, inserted_at, updated_at + FROM %I + WHERE (tenant_id, external_id, inserted_at) >= ($1, $2, $3) + ORDER BY tenant_id, external_id, inserted_at + LIMIT $4 + ', source_partition_name); + + RETURN QUERY EXECUTE query USING last_tenant_id, last_external_id, last_inserted_at, limit_param; +END; +$$; +-- +goose StatementEnd + +-- +goose Down +-- +goose StatementBegin +CREATE OR REPLACE FUNCTION list_paginated_payloads_for_offload( + partition_date date, + limit_param int, + last_tenant_id uuid, + last_inserted_at timestamptz, + last_id bigint, + last_type v1_payload_type +) RETURNS TABLE ( + tenant_id UUID, + id BIGINT, + inserted_at TIMESTAMPTZ, + external_id UUID, + type v1_payload_type, + location v1_payload_location, + external_location_key TEXT, + inline_content JSONB, + updated_at TIMESTAMPTZ +) + LANGUAGE plpgsql AS +$$ +DECLARE + partition_date_str varchar; + source_partition_name varchar; + query text; +BEGIN + IF partition_date IS NULL THEN + RAISE EXCEPTION 'partition_date parameter cannot be NULL'; + END IF; + + SELECT to_char(partition_date, 'YYYYMMDD') INTO partition_date_str; + SELECT format('v1_payload_%s', partition_date_str) INTO source_partition_name; + + IF NOT EXISTS (SELECT 1 FROM pg_tables WHERE tablename = source_partition_name) THEN + RAISE EXCEPTION 'Partition % does not exist', source_partition_name; + END IF; + + query := format(' + SELECT tenant_id, id, inserted_at, external_id, type, location, + external_location_key, inline_content, updated_at + FROM %I + WHERE (tenant_id, inserted_at, id, type) > ($1, $2, $3, $4) + ORDER BY tenant_id, inserted_at, id, type + LIMIT $5 + ', source_partition_name); + + RETURN QUERY EXECUTE query USING last_tenant_id, last_inserted_at, last_id, last_type, limit_param; +END; +$$; + +CREATE OR REPLACE FUNCTION list_paginated_olap_payloads_for_offload( + partition_date date, + limit_param int, + last_tenant_id uuid, + last_external_id uuid, + last_inserted_at timestamptz +) RETURNS TABLE ( + tenant_id UUID, + external_id UUID, + location v1_payload_location_olap, + external_location_key TEXT, + inline_content JSONB, + inserted_at TIMESTAMPTZ, + updated_at TIMESTAMPTZ +) + LANGUAGE plpgsql AS +$$ +DECLARE + partition_date_str varchar; + source_partition_name varchar; + query text; +BEGIN + IF partition_date IS NULL THEN + RAISE EXCEPTION 'partition_date parameter cannot be NULL'; + END IF; + + SELECT to_char(partition_date, 'YYYYMMDD') INTO partition_date_str; + SELECT format('v1_payloads_olap_%s', partition_date_str) INTO source_partition_name; + + IF NOT EXISTS (SELECT 1 FROM pg_tables WHERE tablename = source_partition_name) THEN + RAISE EXCEPTION 'Partition % does not exist', source_partition_name; + END IF; + + query := format(' + SELECT tenant_id, external_id, location, external_location_key, inline_content, inserted_at, updated_at + FROM %I + WHERE (tenant_id, external_id, inserted_at) > ($1, $2, $3) + ORDER BY tenant_id, external_id, inserted_at + LIMIT $4 + ', source_partition_name); + + RETURN QUERY EXECUTE query USING last_tenant_id, last_external_id, last_inserted_at, limit_param; +END; +$$; +-- +goose StatementEnd diff --git a/internal/services/controllers/v1/olap/controller.go b/internal/services/controllers/v1/olap/controller.go index f10dd1b98..ef667ce1c 100644 --- a/internal/services/controllers/v1/olap/controller.go +++ b/internal/services/controllers/v1/olap/controller.go @@ -865,7 +865,7 @@ func (oc *OLAPControllerImpl) processPayloadExternalCutovers(ctx context.Context oc.l.Debug().Msgf("payload external cutover: processing external cutover payloads") p := oc.repo.Payloads() - err := oc.repo.OLAP().ProcessOLAPPayloadCutovers(ctx, p.ExternalStoreEnabled(), p.InlineStoreTTL(), p.ExternalCutoverBatchSize()) + err := oc.repo.OLAP().ProcessOLAPPayloadCutovers(ctx, p.ExternalStoreEnabled(), p.InlineStoreTTL(), p.ExternalCutoverBatchSize(), p.ExternalCutoverNumConcurrentOffloads()) if err != nil { span.RecordError(err) diff --git a/pkg/config/loader/loader.go b/pkg/config/loader/loader.go index 99a98c743..500412989 100644 --- a/pkg/config/loader/loader.go +++ b/pkg/config/loader/loader.go @@ -302,14 +302,15 @@ func (c *ConfigLoader) InitDataLayer() (res *database.Layer, err error) { inlineStoreTTL := time.Duration(inlineStoreTTLDays) * 24 * time.Hour payloadStoreOpts := repov1.PayloadStoreRepositoryOpts{ - EnablePayloadDualWrites: scf.PayloadStore.EnablePayloadDualWrites, - EnableTaskEventPayloadDualWrites: scf.PayloadStore.EnableTaskEventPayloadDualWrites, - EnableOLAPPayloadDualWrites: scf.PayloadStore.EnableOLAPPayloadDualWrites, - EnableDagDataPayloadDualWrites: scf.PayloadStore.EnableDagDataPayloadDualWrites, - ExternalCutoverProcessInterval: scf.PayloadStore.ExternalCutoverProcessInterval, - ExternalCutoverBatchSize: scf.PayloadStore.ExternalCutoverBatchSize, - InlineStoreTTL: &inlineStoreTTL, - EnableImmediateOffloads: scf.PayloadStore.EnableImmediateOffloads, + EnablePayloadDualWrites: scf.PayloadStore.EnablePayloadDualWrites, + EnableTaskEventPayloadDualWrites: scf.PayloadStore.EnableTaskEventPayloadDualWrites, + EnableOLAPPayloadDualWrites: scf.PayloadStore.EnableOLAPPayloadDualWrites, + EnableDagDataPayloadDualWrites: scf.PayloadStore.EnableDagDataPayloadDualWrites, + ExternalCutoverProcessInterval: scf.PayloadStore.ExternalCutoverProcessInterval, + ExternalCutoverBatchSize: scf.PayloadStore.ExternalCutoverBatchSize, + ExternalCutoverNumConcurrentOffloads: scf.PayloadStore.ExternalCutoverNumConcurrentOffloads, + InlineStoreTTL: &inlineStoreTTL, + EnableImmediateOffloads: scf.PayloadStore.EnableImmediateOffloads, } statusUpdateOpts := repov1.StatusUpdateBatchSizeLimits{ diff --git a/pkg/config/server/server.go b/pkg/config/server/server.go index dd55ecede..5c39909a2 100644 --- a/pkg/config/server/server.go +++ b/pkg/config/server/server.go @@ -638,14 +638,15 @@ type ServerConfig struct { } type PayloadStoreConfig struct { - EnablePayloadDualWrites bool `mapstructure:"enablePayloadDualWrites" json:"enablePayloadDualWrites,omitempty" default:"true"` - EnableTaskEventPayloadDualWrites bool `mapstructure:"enableTaskEventPayloadDualWrites" json:"enableTaskEventPayloadDualWrites,omitempty" default:"true"` - EnableDagDataPayloadDualWrites bool `mapstructure:"enableDagDataPayloadDualWrites" json:"enableDagDataPayloadDualWrites,omitempty" default:"true"` - EnableOLAPPayloadDualWrites bool `mapstructure:"enableOLAPPayloadDualWrites" json:"enableOLAPPayloadDualWrites,omitempty" default:"true"` - ExternalCutoverProcessInterval time.Duration `mapstructure:"externalCutoverProcessInterval" json:"externalCutoverProcessInterval,omitempty" default:"15s"` - ExternalCutoverBatchSize int32 `mapstructure:"externalCutoverBatchSize" json:"externalCutoverBatchSize,omitempty" default:"1000"` - InlineStoreTTLDays int32 `mapstructure:"inlineStoreTTLDays" json:"inlineStoreTTLDays,omitempty" default:"2"` - EnableImmediateOffloads bool `mapstructure:"enableImmediateOffloads" json:"enableImmediateOffloads,omitempty" default:"false"` + EnablePayloadDualWrites bool `mapstructure:"enablePayloadDualWrites" json:"enablePayloadDualWrites,omitempty" default:"true"` + EnableTaskEventPayloadDualWrites bool `mapstructure:"enableTaskEventPayloadDualWrites" json:"enableTaskEventPayloadDualWrites,omitempty" default:"true"` + EnableDagDataPayloadDualWrites bool `mapstructure:"enableDagDataPayloadDualWrites" json:"enableDagDataPayloadDualWrites,omitempty" default:"true"` + EnableOLAPPayloadDualWrites bool `mapstructure:"enableOLAPPayloadDualWrites" json:"enableOLAPPayloadDualWrites,omitempty" default:"true"` + ExternalCutoverProcessInterval time.Duration `mapstructure:"externalCutoverProcessInterval" json:"externalCutoverProcessInterval,omitempty" default:"15s"` + ExternalCutoverBatchSize int32 `mapstructure:"externalCutoverBatchSize" json:"externalCutoverBatchSize,omitempty" default:"1000"` + ExternalCutoverNumConcurrentOffloads int32 `mapstructure:"externalCutoverNumConcurrentOffloads" json:"externalCutoverNumConcurrentOffloads,omitempty" default:"10"` + InlineStoreTTLDays int32 `mapstructure:"inlineStoreTTLDays" json:"inlineStoreTTLDays,omitempty" default:"2"` + EnableImmediateOffloads bool `mapstructure:"enableImmediateOffloads" json:"enableImmediateOffloads,omitempty" default:"false"` } func (c *ServerConfig) HasService(name string) bool { @@ -922,6 +923,7 @@ func BindAllEnv(v *viper.Viper) { _ = v.BindEnv("payloadStore.enableOLAPPayloadDualWrites", "SERVER_PAYLOAD_STORE_ENABLE_OLAP_PAYLOAD_DUAL_WRITES") _ = v.BindEnv("payloadStore.externalCutoverProcessInterval", "SERVER_PAYLOAD_STORE_EXTERNAL_CUTOVER_PROCESS_INTERVAL") _ = v.BindEnv("payloadStore.externalCutoverBatchSize", "SERVER_PAYLOAD_STORE_EXTERNAL_CUTOVER_BATCH_SIZE") + _ = v.BindEnv("payloadStore.externalCutoverNumConcurrentOffloads", "SERVER_PAYLOAD_STORE_EXTERNAL_CUTOVER_NUM_CONCURRENT_OFFLOADS") _ = v.BindEnv("payloadStore.inlineStoreTTLDays", "SERVER_PAYLOAD_STORE_INLINE_STORE_TTL_DAYS") _ = v.BindEnv("payloadStore.enableImmediateOffloads", "SERVER_PAYLOAD_STORE_ENABLE_IMMEDIATE_OFFLOADS") diff --git a/pkg/repository/v1/olap.go b/pkg/repository/v1/olap.go index 21464d7f8..83106942b 100644 --- a/pkg/repository/v1/olap.go +++ b/pkg/repository/v1/olap.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "log" + "maps" "math/rand" "sort" "sync" @@ -267,7 +268,7 @@ type OLAPRepository interface { ListWorkflowRunExternalIds(ctx context.Context, tenantId string, opts ListWorkflowRunOpts) ([]pgtype.UUID, error) - ProcessOLAPPayloadCutovers(ctx context.Context, externalStoreEnabled bool, inlineStoreTTL *time.Duration, externalCutoverBatchSize int32) error + ProcessOLAPPayloadCutovers(ctx context.Context, externalStoreEnabled bool, inlineStoreTTL *time.Duration, externalCutoverBatchSize, externalCutoverNumConcurrentOffloads int32) error } type StatusUpdateBatchSizeLimits struct { @@ -2708,10 +2709,115 @@ type OLAPCutoverBatchOutcome struct { NextPagination OLAPPaginationParams } -func (p *OLAPRepositoryImpl) processOLAPPayloadCutoverBatch(ctx context.Context, processId pgtype.UUID, partitionDate PartitionDate, pagination OLAPPaginationParams) (*OLAPCutoverBatchOutcome, error) { +func (p *OLAPRepositoryImpl) processOLAPPayloadCutoverBatch(ctx context.Context, processId pgtype.UUID, partitionDate PartitionDate, pagination OLAPPaginationParams, externalCutoverBatchSize, externalCutoverNumConcurrentOffloads int32) (*OLAPCutoverBatchOutcome, error) { ctx, span := telemetry.NewSpan(ctx, "OLAPRepository.processOLAPPayloadCutoverBatch") defer span.End() + tableName := fmt.Sprintf("v1_payloads_olap_offload_tmp_%s", partitionDate.String()) + windowSize := externalCutoverBatchSize * externalCutoverNumConcurrentOffloads + + payloadRanges, err := p.queries.CreateOLAPPayloadRangeChunks(ctx, p.pool, sqlcv1.CreateOLAPPayloadRangeChunksParams{ + Chunksize: externalCutoverBatchSize, + Partitiondate: pgtype.Date(partitionDate), + Windowsize: windowSize, + Lasttenantid: pagination.LastTenantId, + Lastexternalid: pagination.LastExternalId, + Lastinsertedat: pagination.LastInsertedAt, + }) + + if err != nil && !errors.Is(err, pgx.ErrNoRows) { + return nil, fmt.Errorf("failed to create payload range chunks: %w", err) + } + + if errors.Is(err, pgx.ErrNoRows) { + return &OLAPCutoverBatchOutcome{ + ShouldContinue: false, + NextPagination: pagination, + }, nil + } + + mu := sync.Mutex{} + eg := errgroup.Group{} + + externalIdToKey := make(map[PayloadExternalId]ExternalPayloadLocationKey) + externalIdToPayload := make(map[PayloadExternalId]sqlcv1.ListPaginatedOLAPPayloadsForOffloadRow) + numPayloads := 0 + + for _, payloadRange := range payloadRanges { + pr := payloadRange + eg.Go(func() error { + payloads, err := p.queries.ListPaginatedOLAPPayloadsForOffload(ctx, p.pool, sqlcv1.ListPaginatedOLAPPayloadsForOffloadParams{ + Partitiondate: pgtype.Date(partitionDate), + Limitparam: externalCutoverBatchSize, + Lasttenantid: pr.TenantID, + Lastexternalid: pr.ExternalID, + Lastinsertedat: pr.InsertedAt, + }) + + if err != nil { + return fmt.Errorf("failed to list paginated payloads for offload") + } + + alreadyExternalPayloads := make(map[PayloadExternalId]ExternalPayloadLocationKey) + externalIdToPayloadInner := make(map[PayloadExternalId]sqlcv1.ListPaginatedOLAPPayloadsForOffloadRow) + tenantIdToOffloadOpts := make(map[TenantID][]StoreOLAPPayloadOpts) + + for _, payload := range payloads { + externalId := PayloadExternalId(payload.ExternalID.String()) + externalIdToPayloadInner[externalId] = *payload + + if payload.Location != sqlcv1.V1PayloadLocationOlapINLINE { + alreadyExternalPayloads[externalId] = ExternalPayloadLocationKey(payload.ExternalLocationKey) + } else { + tenantIdToOffloadOpts[TenantID(payload.TenantID.String())] = append(tenantIdToOffloadOpts[TenantID(payload.TenantID.String())], StoreOLAPPayloadOpts{ + InsertedAt: payload.InsertedAt, + Payload: payload.InlineContent, + ExternalId: payload.ExternalID, + }) + } + } + + externalIdToKeyInner := make(map[PayloadExternalId]ExternalPayloadLocationKey) + for tenant, opts := range tenantIdToOffloadOpts { + externalIdToKeyForTenant, err := p.PutPayloads(ctx, p.pool, tenant, opts...) + + if err != nil { + return fmt.Errorf("failed to offload olap payloads for tenant %s", tenant) + } + + maps.Copy(externalIdToKeyInner, externalIdToKeyForTenant) + } + + mu.Lock() + maps.Copy(externalIdToKey, externalIdToKeyInner) + maps.Copy(externalIdToKey, alreadyExternalPayloads) + maps.Copy(externalIdToPayload, externalIdToPayloadInner) + numPayloads += len(payloads) + mu.Unlock() + + return nil + }) + } + + err = eg.Wait() + + if err != nil { + return nil, err + } + + span.SetAttributes(attribute.Int("num_payloads_read", numPayloads)) + payloadsToInsert := make([]sqlcv1.CutoverOLAPPayloadToInsert, 0, numPayloads) + + for externalId, key := range externalIdToKey { + payload := externalIdToPayload[externalId] + payloadsToInsert = append(payloadsToInsert, sqlcv1.CutoverOLAPPayloadToInsert{ + TenantID: payload.TenantID, + InsertedAt: payload.InsertedAt, + ExternalID: sqlchelpers.UUIDFromStr(string(externalId)), + ExternalLocationKey: string(key), + }) + } + tx, commit, rollback, err := sqlchelpers.PrepareTx(ctx, p.pool, p.l, 10000) if err != nil { @@ -2720,86 +2826,25 @@ func (p *OLAPRepositoryImpl) processOLAPPayloadCutoverBatch(ctx context.Context, defer rollback() - tableName := fmt.Sprintf("v1_payloads_olap_offload_tmp_%s", partitionDate.String()) - - payloads, err := p.queries.ListPaginatedOLAPPayloadsForOffload(ctx, tx, sqlcv1.ListPaginatedOLAPPayloadsForOffloadParams{ - Partitiondate: pgtype.Date(partitionDate), - Lasttenantid: pagination.LastTenantId, - Lastexternalid: pagination.LastExternalId, - Lastinsertedat: pagination.LastInsertedAt, - Limitparam: pagination.Limit, - }) - - if err != nil { - return nil, fmt.Errorf("failed to list payloads for offload: %w", err) - } - - span.SetAttributes(attribute.Int("num_payloads_read", len(payloads))) - - tenantIdToOffloadOpts := make(map[TenantID][]StoreOLAPPayloadOpts) - externalIdToKey := make(map[PayloadExternalId]ExternalPayloadLocationKey) - externalIdToPayload := make(map[PayloadExternalId]sqlcv1.ListPaginatedOLAPPayloadsForOffloadRow) - - for _, payload := range payloads { - externalIdToPayload[PayloadExternalId(payload.ExternalID.String())] = *payload - - if payload.Location != sqlcv1.V1PayloadLocationOlapINLINE { - externalIdToKey[PayloadExternalId(payload.ExternalID.String())] = ExternalPayloadLocationKey(payload.ExternalLocationKey) - } else { - tenantIdToOffloadOpts[TenantID(payload.TenantID.String())] = append(tenantIdToOffloadOpts[TenantID(payload.TenantID.String())], StoreOLAPPayloadOpts{ - InsertedAt: payload.InsertedAt, - Payload: payload.InlineContent, - ExternalId: payload.ExternalID, - }) - } - } - - for tenant, opts := range tenantIdToOffloadOpts { - externalIdToKeyInner, err := p.PutPayloads(ctx, tx, tenant, opts...) - - if err != nil { - return nil, fmt.Errorf("failed to offload olap payloads for tenant %s: %w", tenant, err) - } - - for externalId, key := range externalIdToKeyInner { - externalIdToKey[externalId] = key - } - } - - payloadsToInsert := make([]sqlcv1.CutoverOLAPPayloadToInsert, 0, len(payloads)) - - for externalId, key := range externalIdToKey { - payload := externalIdToPayload[externalId] - - payloadsToInsert = append(payloadsToInsert, sqlcv1.CutoverOLAPPayloadToInsert{ - TenantID: payload.TenantID, - InsertedAt: payload.InsertedAt, - ExternalID: sqlchelpers.UUIDFromStr(string(externalId)), - ExternalLocationKey: string(key), - }) - } - - insertResult, err := sqlcv1.InsertCutOverOLAPPayloadsIntoTempTable(ctx, tx, tableName, payloadsToInsert) + inserted, err := sqlcv1.InsertCutOverOLAPPayloadsIntoTempTable(ctx, tx, tableName, payloadsToInsert) if err != nil && !errors.Is(err, pgx.ErrNoRows) { return nil, fmt.Errorf("failed to copy offloaded payloads into temp table: %w", err) } - isNoRows := errors.Is(err, pgx.ErrNoRows) + if errors.Is(err, pgx.ErrNoRows) { + return &OLAPCutoverBatchOutcome{ + ShouldContinue: false, + NextPagination: pagination, + }, nil + } - params := OLAPPaginationParams{ - LastTenantId: insertResult.TenantId, - LastInsertedAt: insertResult.InsertedAt, - LastExternalId: insertResult.ExternalId, + extendedLease, err := p.acquireOrExtendJobLease(ctx, tx, processId, partitionDate, OLAPPaginationParams{ + LastTenantId: inserted.TenantId, + LastInsertedAt: inserted.InsertedAt, + LastExternalId: inserted.ExternalId, Limit: pagination.Limit, - } - - // hack so that we don't have errors from zero values when no rows are returned - if isNoRows { - params = pagination - } - - extendedLease, err := p.acquireOrExtendJobLease(ctx, tx, processId, partitionDate, params) + }) if err != nil { return nil, fmt.Errorf("failed to extend cutover job lease: %w", err) @@ -2809,7 +2854,7 @@ func (p *OLAPRepositoryImpl) processOLAPPayloadCutoverBatch(ctx context.Context, return nil, fmt.Errorf("failed to commit copy offloaded payloads transaction: %w", err) } - if len(payloads) < int(pagination.Limit) || isNoRows { + if numPayloads < int(windowSize) { return &OLAPCutoverBatchOutcome{ ShouldContinue: false, NextPagination: extendedLease.Pagination, @@ -2923,7 +2968,7 @@ func (p *OLAPRepositoryImpl) prepareCutoverTableJob(ctx context.Context, process }, nil } -func (p *OLAPRepositoryImpl) processSinglePartition(ctx context.Context, processId pgtype.UUID, partitionDate PartitionDate, inlineStoreTTL *time.Duration, externalCutoverBatchSize int32) error { +func (p *OLAPRepositoryImpl) processSinglePartition(ctx context.Context, processId pgtype.UUID, partitionDate PartitionDate, inlineStoreTTL *time.Duration, externalCutoverBatchSize, externalCutoverNumConcurrentOffloads int32) error { ctx, span := telemetry.NewSpan(ctx, "olap_repository.processSinglePartition") defer span.End() @@ -2940,7 +2985,7 @@ func (p *OLAPRepositoryImpl) processSinglePartition(ctx context.Context, process pagination := jobMeta.Pagination for { - outcome, err := p.processOLAPPayloadCutoverBatch(ctx, processId, partitionDate, pagination) + outcome, err := p.processOLAPPayloadCutoverBatch(ctx, processId, partitionDate, pagination, externalCutoverBatchSize, externalCutoverNumConcurrentOffloads) if err != nil { return fmt.Errorf("failed to process payload cutover batch: %w", err) @@ -2993,7 +3038,7 @@ func (p *OLAPRepositoryImpl) processSinglePartition(ctx context.Context, process return nil } -func (p *OLAPRepositoryImpl) ProcessOLAPPayloadCutovers(ctx context.Context, externalStoreEnabled bool, inlineStoreTTL *time.Duration, externalCutoverBatchSize int32) error { +func (p *OLAPRepositoryImpl) ProcessOLAPPayloadCutovers(ctx context.Context, externalStoreEnabled bool, inlineStoreTTL *time.Duration, externalCutoverBatchSize, externalCutoverNumConcurrentOffloads int32) error { if !externalStoreEnabled { return nil } @@ -3020,7 +3065,7 @@ func (p *OLAPRepositoryImpl) ProcessOLAPPayloadCutovers(ctx context.Context, ext for _, partition := range partitions { p.l.Info().Str("partition", partition.PartitionName).Msg("processing payload cutover for partition") - err = p.processSinglePartition(ctx, processId, PartitionDate(partition.PartitionDate), inlineStoreTTL, externalCutoverBatchSize) + err = p.processSinglePartition(ctx, processId, PartitionDate(partition.PartitionDate), inlineStoreTTL, externalCutoverBatchSize, externalCutoverNumConcurrentOffloads) if err != nil { return fmt.Errorf("failed to process partition %s: %w", partition.PartitionName, err) diff --git a/pkg/repository/v1/payloadstore.go b/pkg/repository/v1/payloadstore.go index 358c2370f..37e0444a0 100644 --- a/pkg/repository/v1/payloadstore.go +++ b/pkg/repository/v1/payloadstore.go @@ -4,7 +4,9 @@ import ( "context" "errors" "fmt" + "maps" "sort" + "sync" "time" "github.com/google/uuid" @@ -16,6 +18,7 @@ import ( "github.com/jackc/pgx/v5/pgxpool" "github.com/rs/zerolog" "go.opentelemetry.io/otel/attribute" + "golang.org/x/sync/errgroup" ) type StorePayloadOpts struct { @@ -69,36 +72,39 @@ type PayloadStoreRepository interface { ExternalCutoverProcessInterval() time.Duration InlineStoreTTL() *time.Duration ExternalCutoverBatchSize() int32 + ExternalCutoverNumConcurrentOffloads() int32 ExternalStoreEnabled() bool ExternalStore() ExternalStore ProcessPayloadCutovers(ctx context.Context) error } type payloadStoreRepositoryImpl struct { - pool *pgxpool.Pool - l *zerolog.Logger - queries *sqlcv1.Queries - externalStoreEnabled bool - inlineStoreTTL *time.Duration - externalStore ExternalStore - enablePayloadDualWrites bool - enableTaskEventPayloadDualWrites bool - enableDagDataPayloadDualWrites bool - enableOLAPPayloadDualWrites bool - externalCutoverProcessInterval time.Duration - externalCutoverBatchSize int32 - enableImmediateOffloads bool + pool *pgxpool.Pool + l *zerolog.Logger + queries *sqlcv1.Queries + externalStoreEnabled bool + inlineStoreTTL *time.Duration + externalStore ExternalStore + enablePayloadDualWrites bool + enableTaskEventPayloadDualWrites bool + enableDagDataPayloadDualWrites bool + enableOLAPPayloadDualWrites bool + externalCutoverProcessInterval time.Duration + externalCutoverBatchSize int32 + externalCutoverNumConcurrentOffloads int32 + enableImmediateOffloads bool } type PayloadStoreRepositoryOpts struct { - EnablePayloadDualWrites bool - EnableTaskEventPayloadDualWrites bool - EnableDagDataPayloadDualWrites bool - EnableOLAPPayloadDualWrites bool - ExternalCutoverProcessInterval time.Duration - ExternalCutoverBatchSize int32 - InlineStoreTTL *time.Duration - EnableImmediateOffloads bool + EnablePayloadDualWrites bool + EnableTaskEventPayloadDualWrites bool + EnableDagDataPayloadDualWrites bool + EnableOLAPPayloadDualWrites bool + ExternalCutoverProcessInterval time.Duration + ExternalCutoverBatchSize int32 + ExternalCutoverNumConcurrentOffloads int32 + InlineStoreTTL *time.Duration + EnableImmediateOffloads bool } func NewPayloadStoreRepository( @@ -112,16 +118,17 @@ func NewPayloadStoreRepository( l: l, queries: queries, - externalStoreEnabled: false, - inlineStoreTTL: opts.InlineStoreTTL, - externalStore: &NoOpExternalStore{}, - enablePayloadDualWrites: opts.EnablePayloadDualWrites, - enableTaskEventPayloadDualWrites: opts.EnableTaskEventPayloadDualWrites, - enableDagDataPayloadDualWrites: opts.EnableDagDataPayloadDualWrites, - enableOLAPPayloadDualWrites: opts.EnableOLAPPayloadDualWrites, - externalCutoverProcessInterval: opts.ExternalCutoverProcessInterval, - externalCutoverBatchSize: opts.ExternalCutoverBatchSize, - enableImmediateOffloads: opts.EnableImmediateOffloads, + externalStoreEnabled: false, + inlineStoreTTL: opts.InlineStoreTTL, + externalStore: &NoOpExternalStore{}, + enablePayloadDualWrites: opts.EnablePayloadDualWrites, + enableTaskEventPayloadDualWrites: opts.EnableTaskEventPayloadDualWrites, + enableDagDataPayloadDualWrites: opts.EnableDagDataPayloadDualWrites, + enableOLAPPayloadDualWrites: opts.EnableOLAPPayloadDualWrites, + externalCutoverProcessInterval: opts.ExternalCutoverProcessInterval, + externalCutoverBatchSize: opts.ExternalCutoverBatchSize, + externalCutoverNumConcurrentOffloads: opts.ExternalCutoverNumConcurrentOffloads, + enableImmediateOffloads: opts.EnableImmediateOffloads, } } @@ -386,6 +393,10 @@ func (p *payloadStoreRepositoryImpl) ExternalCutoverBatchSize() int32 { return p.externalCutoverBatchSize } +func (p *payloadStoreRepositoryImpl) ExternalCutoverNumConcurrentOffloads() int32 { + return p.externalCutoverNumConcurrentOffloads +} + func (p *payloadStoreRepositoryImpl) ExternalStoreEnabled() bool { return p.externalStoreEnabled } @@ -427,65 +438,102 @@ func (p *payloadStoreRepositoryImpl) ProcessPayloadCutoverBatch(ctx context.Cont ctx, span := telemetry.NewSpan(ctx, "PayloadStoreRepository.ProcessPayloadCutoverBatch") defer span.End() - tx, commit, rollback, err := sqlchelpers.PrepareTx(ctx, p.pool, p.l, 10000) - - if err != nil { - return nil, fmt.Errorf("failed to prepare transaction for copying offloaded payloads: %w", err) - } - - defer rollback() - tableName := fmt.Sprintf("v1_payload_offload_tmp_%s", partitionDate.String()) - payloads, err := p.queries.ListPaginatedPayloadsForOffload(ctx, tx, sqlcv1.ListPaginatedPayloadsForOffloadParams{ + windowSize := p.externalCutoverBatchSize * p.externalCutoverNumConcurrentOffloads + + payloadRanges, err := p.queries.CreatePayloadRangeChunks(ctx, p.pool, sqlcv1.CreatePayloadRangeChunksParams{ + Chunksize: p.externalCutoverBatchSize, Partitiondate: pgtype.Date(partitionDate), - Limitparam: p.externalCutoverBatchSize, + Windowsize: windowSize, Lasttenantid: pagination.LastTenantID, Lastinsertedat: pagination.LastInsertedAt, Lastid: pagination.LastID, Lasttype: pagination.LastType, }) - if err != nil { - return nil, fmt.Errorf("failed to list payloads for offload: %w", err) + if err != nil && !errors.Is(err, pgx.ErrNoRows) { + return nil, fmt.Errorf("failed to create payload range chunks: %w", err) } - span.SetAttributes(attribute.Int("num_payloads_read", len(payloads))) + if errors.Is(err, pgx.ErrNoRows) { + return &CutoverBatchOutcome{ + ShouldContinue: false, + NextPagination: pagination, + }, nil + } - alreadyExternalPayloads := make(map[PayloadExternalId]ExternalPayloadLocationKey) + eg := errgroup.Group{} + mu := sync.Mutex{} + + externalIdToKey := make(map[PayloadExternalId]ExternalPayloadLocationKey) externalIdToPayload := make(map[PayloadExternalId]sqlcv1.ListPaginatedPayloadsForOffloadRow) - offloadOpts := make([]OffloadToExternalStoreOpts, 0, len(payloads)) + numPayloads := 0 - for _, payload := range payloads { - externalId := PayloadExternalId(payload.ExternalID.String()) - - if externalId == "" { - externalId = PayloadExternalId(uuid.NewString()) - } - - externalIdToPayload[externalId] = *payload - if payload.Location != sqlcv1.V1PayloadLocationINLINE { - alreadyExternalPayloads[externalId] = ExternalPayloadLocationKey(payload.ExternalLocationKey) - } else { - offloadOpts = append(offloadOpts, OffloadToExternalStoreOpts{ - TenantId: TenantID(payload.TenantID.String()), - ExternalID: externalId, - InsertedAt: payload.InsertedAt, - Payload: payload.InlineContent, + for _, payloadRange := range payloadRanges { + pr := payloadRange + eg.Go(func() error { + payloads, err := p.queries.ListPaginatedPayloadsForOffload(ctx, p.pool, sqlcv1.ListPaginatedPayloadsForOffloadParams{ + Partitiondate: pgtype.Date(partitionDate), + Limitparam: p.externalCutoverBatchSize, + Lasttenantid: pr.TenantID, + Lastinsertedat: pr.InsertedAt, + Lastid: pr.ID, + Lasttype: pr.Type, }) - } + + if err != nil { + return fmt.Errorf("failed to list paginated payloads for offload") + } + + alreadyExternalPayloads := make(map[PayloadExternalId]ExternalPayloadLocationKey) + externalIdToPayloadInner := make(map[PayloadExternalId]sqlcv1.ListPaginatedPayloadsForOffloadRow) + offloadOpts := make([]OffloadToExternalStoreOpts, 0, len(payloads)) + + for _, payload := range payloads { + externalId := PayloadExternalId(payload.ExternalID.String()) + + if externalId == "" { + externalId = PayloadExternalId(uuid.NewString()) + } + + externalIdToPayloadInner[externalId] = *payload + if payload.Location != sqlcv1.V1PayloadLocationINLINE { + alreadyExternalPayloads[externalId] = ExternalPayloadLocationKey(payload.ExternalLocationKey) + } else { + offloadOpts = append(offloadOpts, OffloadToExternalStoreOpts{ + TenantId: TenantID(payload.TenantID.String()), + ExternalID: externalId, + InsertedAt: payload.InsertedAt, + Payload: payload.InlineContent, + }) + } + } + + externalIdToKeyInner, err := p.ExternalStore().Store(ctx, offloadOpts...) + + if err != nil { + return fmt.Errorf("failed to offload payloads to external store") + } + + mu.Lock() + maps.Copy(externalIdToKey, externalIdToKeyInner) + maps.Copy(externalIdToKey, alreadyExternalPayloads) + maps.Copy(externalIdToPayload, externalIdToPayloadInner) + numPayloads += len(payloads) + mu.Unlock() + + return nil + }) } - externalIdToKey, err := p.ExternalStore().Store(ctx, offloadOpts...) + err = eg.Wait() if err != nil { - return nil, fmt.Errorf("failed to offload payloads to external store: %w", err) + return nil, err } - for r, k := range alreadyExternalPayloads { - externalIdToKey[r] = k - } - - payloadsToInsert := make([]sqlcv1.CutoverPayloadToInsert, 0, len(payloads)) + span.SetAttributes(attribute.Int("num_payloads_read", numPayloads)) + payloadsToInsert := make([]sqlcv1.CutoverPayloadToInsert, 0, numPayloads) for externalId, key := range externalIdToKey { payload := externalIdToPayload[externalId] @@ -499,27 +547,33 @@ func (p *payloadStoreRepositoryImpl) ProcessPayloadCutoverBatch(ctx context.Cont }) } + tx, commit, rollback, err := sqlchelpers.PrepareTx(ctx, p.pool, p.l, 10000) + + if err != nil { + return nil, fmt.Errorf("failed to prepare transaction for copying offloaded payloads: %w", err) + } + + defer rollback() + inserted, err := sqlcv1.InsertCutOverPayloadsIntoTempTable(ctx, tx, tableName, payloadsToInsert) if err != nil && !errors.Is(err, pgx.ErrNoRows) { return nil, fmt.Errorf("failed to copy offloaded payloads into temp table: %w", err) } - isNoRows := errors.Is(err, pgx.ErrNoRows) + if errors.Is(err, pgx.ErrNoRows) { + return &CutoverBatchOutcome{ + ShouldContinue: false, + NextPagination: pagination, + }, nil + } - params := PaginationParams{ + extendedLease, err := p.acquireOrExtendJobLease(ctx, tx, processId, partitionDate, PaginationParams{ LastTenantID: inserted.TenantId, LastInsertedAt: inserted.InsertedAt, LastID: inserted.ID, LastType: inserted.Type, - } - - // hack so that we don't have errors from zero values when no rows are returned - if isNoRows { - params = pagination - } - - extendedLease, err := p.acquireOrExtendJobLease(ctx, tx, processId, partitionDate, params) + }) if err != nil { return nil, fmt.Errorf("failed to extend cutover job lease: %w", err) @@ -529,7 +583,7 @@ func (p *payloadStoreRepositoryImpl) ProcessPayloadCutoverBatch(ctx context.Cont return nil, fmt.Errorf("failed to commit copy offloaded payloads transaction: %w", err) } - if len(payloads) < int(p.externalCutoverBatchSize) || isNoRows { + if numPayloads < int(windowSize) { return &CutoverBatchOutcome{ ShouldContinue: false, NextPagination: extendedLease.Pagination, diff --git a/pkg/repository/v1/sqlcv1/olap.sql b/pkg/repository/v1/sqlcv1/olap.sql index c2f1aff96..09c9f2e32 100644 --- a/pkg/repository/v1/sqlcv1/olap.sql +++ b/pkg/repository/v1/sqlcv1/olap.sql @@ -1867,6 +1867,33 @@ SELECT updated_at::TIMESTAMPTZ FROM payloads; +-- name: CreateOLAPPayloadRangeChunks :many +WITH payloads AS ( + SELECT + (p).* + FROM list_paginated_olap_payloads_for_offload( + @partitionDate::DATE, + @windowSize::INTEGER, + @lastTenantId::UUID, + @lastExternalId::UUID, + @lastInsertedAt::TIMESTAMPTZ + ) p +), with_rows AS ( + SELECT + tenant_id::UUID, + external_id::UUID, + inserted_at::TIMESTAMPTZ, + ROW_NUMBER() OVER (ORDER BY tenant_id, external_id, inserted_at) AS rn + FROM payloads +) + +SELECT * +FROM with_rows +-- row numbers are one-indexed +WHERE MOD(rn, @chunkSize::INTEGER) = 1 +ORDER BY tenant_id, external_id, inserted_at +; + -- name: CreateV1PayloadOLAPCutoverTemporaryTable :exec SELECT copy_v1_payloads_olap_partition_structure(@date::DATE); diff --git a/pkg/repository/v1/sqlcv1/olap.sql.go b/pkg/repository/v1/sqlcv1/olap.sql.go index ce6795524..743e0c8ea 100644 --- a/pkg/repository/v1/sqlcv1/olap.sql.go +++ b/pkg/repository/v1/sqlcv1/olap.sql.go @@ -322,6 +322,81 @@ func (q *Queries) CreateOLAPPartitions(ctx context.Context, db DBTX, arg CreateO return err } +const createOLAPPayloadRangeChunks = `-- name: CreateOLAPPayloadRangeChunks :many +WITH payloads AS ( + SELECT + (p).* + FROM list_paginated_olap_payloads_for_offload( + $2::DATE, + $3::INTEGER, + $4::UUID, + $5::UUID, + $6::TIMESTAMPTZ + ) p +), with_rows AS ( + SELECT + tenant_id::UUID, + external_id::UUID, + inserted_at::TIMESTAMPTZ, + ROW_NUMBER() OVER (ORDER BY tenant_id, external_id, inserted_at) AS rn + FROM payloads +) + +SELECT tenant_id, external_id, inserted_at, rn +FROM with_rows +WHERE MOD(rn, $1::INTEGER) = 1 +ORDER BY tenant_id, external_id, inserted_at +` + +type CreateOLAPPayloadRangeChunksParams struct { + Chunksize int32 `json:"chunksize"` + Partitiondate pgtype.Date `json:"partitiondate"` + Windowsize int32 `json:"windowsize"` + Lasttenantid pgtype.UUID `json:"lasttenantid"` + Lastexternalid pgtype.UUID `json:"lastexternalid"` + Lastinsertedat pgtype.Timestamptz `json:"lastinsertedat"` +} + +type CreateOLAPPayloadRangeChunksRow struct { + TenantID pgtype.UUID `json:"tenant_id"` + ExternalID pgtype.UUID `json:"external_id"` + InsertedAt pgtype.Timestamptz `json:"inserted_at"` + Rn int64 `json:"rn"` +} + +// row numbers are one-indexed +func (q *Queries) CreateOLAPPayloadRangeChunks(ctx context.Context, db DBTX, arg CreateOLAPPayloadRangeChunksParams) ([]*CreateOLAPPayloadRangeChunksRow, error) { + rows, err := db.Query(ctx, createOLAPPayloadRangeChunks, + arg.Chunksize, + arg.Partitiondate, + arg.Windowsize, + arg.Lasttenantid, + arg.Lastexternalid, + arg.Lastinsertedat, + ) + if err != nil { + return nil, err + } + defer rows.Close() + var items []*CreateOLAPPayloadRangeChunksRow + for rows.Next() { + var i CreateOLAPPayloadRangeChunksRow + if err := rows.Scan( + &i.TenantID, + &i.ExternalID, + &i.InsertedAt, + &i.Rn, + ); err != nil { + return nil, err + } + items = append(items, &i) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + type CreateTaskEventsOLAPParams struct { TenantID pgtype.UUID `json:"tenant_id"` TaskID int64 `json:"task_id"` diff --git a/pkg/repository/v1/sqlcv1/payload-store.sql b/pkg/repository/v1/sqlcv1/payload-store.sql index 7e8aa722b..5ca429f51 100644 --- a/pkg/repository/v1/sqlcv1/payload-store.sql +++ b/pkg/repository/v1/sqlcv1/payload-store.sql @@ -241,6 +241,35 @@ SELECT updated_at::TIMESTAMPTZ FROM payloads; +-- name: CreatePayloadRangeChunks :many +WITH payloads AS ( + SELECT + (p).* + FROM list_paginated_payloads_for_offload( + @partitionDate::DATE, + @windowSize::INTEGER, + @lastTenantId::UUID, + @lastInsertedAt::TIMESTAMPTZ, + @lastId::BIGINT, + @lastType::v1_payload_type + ) p +), with_rows AS ( + SELECT + tenant_id::UUID, + id::BIGINT, + inserted_at::TIMESTAMPTZ, + type::v1_payload_type, + ROW_NUMBER() OVER (ORDER BY tenant_id, inserted_at, id, type) AS rn + FROM payloads +) + +SELECT * +FROM with_rows +-- row numbers are one-indexed +WHERE MOD(rn, @chunkSize::INTEGER) = 1 +ORDER BY tenant_id, inserted_at, id, type +; + -- name: CreateV1PayloadCutoverTemporaryTable :exec SELECT copy_v1_payload_partition_structure(@date::DATE); diff --git a/pkg/repository/v1/sqlcv1/payload-store.sql.go b/pkg/repository/v1/sqlcv1/payload-store.sql.go index ee2081059..df80f9547 100644 --- a/pkg/repository/v1/sqlcv1/payload-store.sql.go +++ b/pkg/repository/v1/sqlcv1/payload-store.sql.go @@ -107,6 +107,87 @@ func (q *Queries) AnalyzeV1Payload(ctx context.Context, db DBTX) error { return err } +const createPayloadRangeChunks = `-- name: CreatePayloadRangeChunks :many +WITH payloads AS ( + SELECT + (p).* + FROM list_paginated_payloads_for_offload( + $2::DATE, + $3::INTEGER, + $4::UUID, + $5::TIMESTAMPTZ, + $6::BIGINT, + $7::v1_payload_type + ) p +), with_rows AS ( + SELECT + tenant_id::UUID, + id::BIGINT, + inserted_at::TIMESTAMPTZ, + type::v1_payload_type, + ROW_NUMBER() OVER (ORDER BY tenant_id, inserted_at, id, type) AS rn + FROM payloads +) + +SELECT tenant_id, id, inserted_at, type, rn +FROM with_rows +WHERE MOD(rn, $1::INTEGER) = 1 +ORDER BY tenant_id, inserted_at, id, type +` + +type CreatePayloadRangeChunksParams struct { + Chunksize int32 `json:"chunksize"` + Partitiondate pgtype.Date `json:"partitiondate"` + Windowsize int32 `json:"windowsize"` + Lasttenantid pgtype.UUID `json:"lasttenantid"` + Lastinsertedat pgtype.Timestamptz `json:"lastinsertedat"` + Lastid int64 `json:"lastid"` + Lasttype V1PayloadType `json:"lasttype"` +} + +type CreatePayloadRangeChunksRow struct { + TenantID pgtype.UUID `json:"tenant_id"` + ID int64 `json:"id"` + InsertedAt pgtype.Timestamptz `json:"inserted_at"` + Type V1PayloadType `json:"type"` + Rn int64 `json:"rn"` +} + +// row numbers are one-indexed +func (q *Queries) CreatePayloadRangeChunks(ctx context.Context, db DBTX, arg CreatePayloadRangeChunksParams) ([]*CreatePayloadRangeChunksRow, error) { + rows, err := db.Query(ctx, createPayloadRangeChunks, + arg.Chunksize, + arg.Partitiondate, + arg.Windowsize, + arg.Lasttenantid, + arg.Lastinsertedat, + arg.Lastid, + arg.Lasttype, + ) + if err != nil { + return nil, err + } + defer rows.Close() + var items []*CreatePayloadRangeChunksRow + for rows.Next() { + var i CreatePayloadRangeChunksRow + if err := rows.Scan( + &i.TenantID, + &i.ID, + &i.InsertedAt, + &i.Type, + &i.Rn, + ); err != nil { + return nil, err + } + items = append(items, &i) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const createV1PayloadCutoverTemporaryTable = `-- name: CreateV1PayloadCutoverTemporaryTable :exec SELECT copy_v1_payload_partition_structure($1::DATE) ` diff --git a/sql/schema/v1-core.sql b/sql/schema/v1-core.sql index 01f14d177..e0fe6adf2 100644 --- a/sql/schema/v1-core.sql +++ b/sql/schema/v1-core.sql @@ -1916,7 +1916,7 @@ BEGIN SELECT tenant_id, id, inserted_at, external_id, type, location, external_location_key, inline_content, updated_at FROM %I - WHERE (tenant_id, inserted_at, id, type) > ($1, $2, $3, $4) + WHERE (tenant_id, inserted_at, id, type) >= ($1, $2, $3, $4) ORDER BY tenant_id, inserted_at, id, type LIMIT $5 ', source_partition_name); diff --git a/sql/schema/v1-olap.sql b/sql/schema/v1-olap.sql index b74709c5f..967467fef 100644 --- a/sql/schema/v1-olap.sql +++ b/sql/schema/v1-olap.sql @@ -954,7 +954,7 @@ BEGIN query := format(' SELECT tenant_id, external_id, location, external_location_key, inline_content, inserted_at, updated_at FROM %I - WHERE (tenant_id, external_id, inserted_at) > ($1, $2, $3) + WHERE (tenant_id, external_id, inserted_at) >= ($1, $2, $3) ORDER BY tenant_id, external_id, inserted_at LIMIT $4 ', source_partition_name);