From c6fd39b4e07d690fe98e097bbb21c23a50d0f157 Mon Sep 17 00:00:00 2001 From: Gabe Ruttner Date: Wed, 6 Aug 2025 05:42:07 -0700 Subject: [PATCH] fix: ProcessTaskTimeouts limit and timeout (#2087) * limit and timeout * right query * configurable * limit * env vars --- .../controllers/v1/task/controller.go | 2 +- pkg/config/loader/loader.go | 9 +++++- pkg/config/server/server.go | 23 +++++++++++++++ pkg/repository/v1/repository.go | 11 +++++-- pkg/repository/v1/task.go | 29 +++++++++++-------- 5 files changed, 58 insertions(+), 16 deletions(-) diff --git a/internal/services/controllers/v1/task/controller.go b/internal/services/controllers/v1/task/controller.go index 335782514..e5af383de 100644 --- a/internal/services/controllers/v1/task/controller.go +++ b/internal/services/controllers/v1/task/controller.go @@ -223,7 +223,7 @@ func New(fs ...TasksControllerOpt) (*TasksControllerImpl, error) { } jitter := t.opsPoolJitter - timeout := time.Second * 5 + timeout := time.Second * 30 t.timeoutTaskOperations = queueutils.NewOperationPool(opts.l, timeout, "timeout step runs", t.processTaskTimeouts).WithJitter(jitter) t.emitSleepOperations = queueutils.NewOperationPool(opts.l, timeout, "emit sleep step runs", t.processSleeps).WithJitter(jitter) diff --git a/pkg/config/loader/loader.go b/pkg/config/loader/loader.go index d2a5c4f37..a5a80991d 100644 --- a/pkg/config/loader/loader.go +++ b/pkg/config/loader/loader.go @@ -256,7 +256,14 @@ func (c *ConfigLoader) InitDataLayer() (res *database.Layer, err error) { return nil, fmt.Errorf("could not parse retention period %s: %w", scf.Runtime.Limits.DefaultTenantRetentionPeriod, err) } - v1, cleanupV1 := repov1.NewRepository(pool, &l, retentionPeriod, retentionPeriod, scf.Runtime.MaxInternalRetryCount, entitlementRepo) + taskLimits := repov1.TaskOperationLimits{ + TimeoutLimit: scf.Runtime.TaskOperationLimits.TimeoutLimit, + ReassignLimit: scf.Runtime.TaskOperationLimits.ReassignLimit, + RetryQueueLimit: scf.Runtime.TaskOperationLimits.RetryQueueLimit, + DurableSleepLimit: scf.Runtime.TaskOperationLimits.DurableSleepLimit, + } + + v1, cleanupV1 := repov1.NewRepository(pool, &l, retentionPeriod, retentionPeriod, scf.Runtime.MaxInternalRetryCount, entitlementRepo, taskLimits) apiRepo, cleanupApiRepo, err := postgresdb.NewAPIRepository(pool, &scf.Runtime, opts...) diff --git a/pkg/config/server/server.go b/pkg/config/server/server.go index 57df153e6..25babb649 100644 --- a/pkg/config/server/server.go +++ b/pkg/config/server/server.go @@ -103,6 +103,20 @@ type ConfigFileOperations struct { PollInterval int `mapstructure:"pollInterval" json:"pollInterval,omitempty" default:"2"` } +type TaskOperationLimitsConfigFile struct { + // TimeoutLimit is the limit for how many tasks to process in a single timeout operation + TimeoutLimit int `mapstructure:"timeoutLimit" json:"timeoutLimit,omitempty" default:"1000"` + + // ReassignLimit is the limit for how many tasks to process in a single reassignment operation + ReassignLimit int `mapstructure:"reassignLimit" json:"reassignLimit,omitempty" default:"1000"` + + // RetryQueueLimit is the limit for how many retry queue items to process in a single operation + RetryQueueLimit int `mapstructure:"retryQueueLimit" json:"retryQueueLimit,omitempty" default:"1000"` + + // DurableSleepLimit is the limit for how many durable sleep items to process in a single operation + DurableSleepLimit int `mapstructure:"durableSleepLimit" json:"durableSleepLimit,omitempty" default:"1000"` +} + // General server runtime options type ConfigFileRuntime struct { // Port is the port that the core server listens on @@ -230,6 +244,9 @@ type ConfigFileRuntime struct { // LogIngestionEnabled controls whether the server enables log ingestion for tasks LogIngestionEnabled bool `mapstructure:"logIngestionEnabled" json:"logIngestionEnabled,omitempty" default:"true"` + + // TaskOperationLimits controls the limits for various task operations + TaskOperationLimits TaskOperationLimitsConfigFile `mapstructure:"taskOperationLimits" json:"taskOperationLimits,omitempty"` } type InternalClientTLSConfigFile struct { @@ -811,4 +828,10 @@ func BindAllEnv(v *viper.Viper) { // operations options _ = v.BindEnv("olap.jitter", "SERVER_OPERATIONS_JITTER") _ = v.BindEnv("olap.pollInterval", "SERVER_OPERATIONS_POLL_INTERVAL") + + // task operation limits options + _ = v.BindEnv("taskOperationLimits.timeoutLimit", "SERVER_TASK_OPERATION_LIMITS_TIMEOUT_LIMIT") + _ = v.BindEnv("taskOperationLimits.reassignLimit", "SERVER_TASK_OPERATION_LIMITS_REASSIGN_LIMIT") + _ = v.BindEnv("taskOperationLimits.retryQueueLimit", "SERVER_TASK_OPERATION_LIMITS_RETRY_QUEUE_LIMIT") + _ = v.BindEnv("taskOperationLimits.durableSleepLimit", "SERVER_TASK_OPERATION_LIMITS_DURABLE_SLEEP_LIMIT") } diff --git a/pkg/repository/v1/repository.go b/pkg/repository/v1/repository.go index aa6be691e..3156e8f1f 100644 --- a/pkg/repository/v1/repository.go +++ b/pkg/repository/v1/repository.go @@ -10,6 +10,13 @@ import ( "github.com/rs/zerolog" ) +type TaskOperationLimits struct { + TimeoutLimit int + ReassignLimit int + RetryQueueLimit int + DurableSleepLimit int +} + type Repository interface { Triggers() TriggerRepository Tasks() TaskRepository @@ -40,7 +47,7 @@ type repositoryImpl struct { webhooks WebhookRepository } -func NewRepository(pool *pgxpool.Pool, l *zerolog.Logger, taskRetentionPeriod, olapRetentionPeriod time.Duration, maxInternalRetryCount int32, entitlements repository.EntitlementsRepository) (Repository, func() error) { +func NewRepository(pool *pgxpool.Pool, l *zerolog.Logger, taskRetentionPeriod, olapRetentionPeriod time.Duration, maxInternalRetryCount int32, entitlements repository.EntitlementsRepository, taskLimits TaskOperationLimits) (Repository, func() error) { v := validator.NewDefaultValidator() shared, cleanupShared := newSharedRepository(pool, v, l, entitlements) @@ -53,7 +60,7 @@ func NewRepository(pool *pgxpool.Pool, l *zerolog.Logger, taskRetentionPeriod, o impl := &repositoryImpl{ triggers: newTriggerRepository(shared), - tasks: newTaskRepository(shared, taskRetentionPeriod, maxInternalRetryCount), + tasks: newTaskRepository(shared, taskRetentionPeriod, maxInternalRetryCount, taskLimits.TimeoutLimit, taskLimits.ReassignLimit, taskLimits.RetryQueueLimit, taskLimits.DurableSleepLimit), scheduler: newSchedulerRepository(shared), matches: matchRepo, olap: newOLAPRepository(shared, olapRetentionPeriod, true), diff --git a/pkg/repository/v1/task.go b/pkg/repository/v1/task.go index 4fd5faafd..7e4e8c88a 100644 --- a/pkg/repository/v1/task.go +++ b/pkg/repository/v1/task.go @@ -255,13 +255,21 @@ type TaskRepositoryImpl struct { taskRetentionPeriod time.Duration maxInternalRetryCount int32 + timeoutLimit int + reassignLimit int + retryQueueLimit int + durableSleepLimit int } -func newTaskRepository(s *sharedRepository, taskRetentionPeriod time.Duration, maxInternalRetryCount int32) TaskRepository { +func newTaskRepository(s *sharedRepository, taskRetentionPeriod time.Duration, maxInternalRetryCount int32, timeoutLimit, reassignLimit, retryQueueLimit, durableSleepLimit int) TaskRepository { return &TaskRepositoryImpl{ sharedRepository: s, taskRetentionPeriod: taskRetentionPeriod, maxInternalRetryCount: maxInternalRetryCount, + timeoutLimit: timeoutLimit, + reassignLimit: reassignLimit, + retryQueueLimit: retryQueueLimit, + durableSleepLimit: durableSleepLimit, } } @@ -1042,7 +1050,7 @@ func (r *TaskRepositoryImpl) ListTaskMetas(ctx context.Context, tenantId string, } func (r *TaskRepositoryImpl) ProcessTaskTimeouts(ctx context.Context, tenantId string) (*TimeoutTasksResponse, bool, error) { - tx, commit, rollback, err := sqlchelpers.PrepareTx(ctx, r.pool, r.l, 5000) + tx, commit, rollback, err := sqlchelpers.PrepareTx(ctx, r.pool, r.l, 25000) if err != nil { return nil, false, err @@ -1050,8 +1058,7 @@ func (r *TaskRepositoryImpl) ProcessTaskTimeouts(ctx context.Context, tenantId s defer rollback() - // TODO: make limit configurable - limit := 1000 + limit := r.timeoutLimit // get task timeouts toTimeout, err := r.queries.ListTasksToTimeout(ctx, tx, sqlcv1.ListTasksToTimeoutParams{ @@ -1114,7 +1121,7 @@ func (r *TaskRepositoryImpl) ProcessTaskTimeouts(ctx context.Context, tenantId s } func (r *TaskRepositoryImpl) ProcessTaskReassignments(ctx context.Context, tenantId string) (*FailTasksResponse, bool, error) { - tx, commit, rollback, err := sqlchelpers.PrepareTx(ctx, r.pool, r.l, 5000) + tx, commit, rollback, err := sqlchelpers.PrepareTx(ctx, r.pool, r.l, 25000) if err != nil { return nil, false, err @@ -1122,8 +1129,7 @@ func (r *TaskRepositoryImpl) ProcessTaskReassignments(ctx context.Context, tenan defer rollback() - // TODO: make limit configurable - limit := 1000 + limit := r.reassignLimit toReassign, err := r.queries.ListTasksToReassign(ctx, tx, sqlcv1.ListTasksToReassignParams{ Tenantid: sqlchelpers.UUIDFromStr(tenantId), @@ -1178,7 +1184,7 @@ func (r *TaskRepositoryImpl) ProcessTaskReassignments(ctx context.Context, tenan } func (r *TaskRepositoryImpl) ProcessTaskRetryQueueItems(ctx context.Context, tenantId string) ([]*sqlcv1.V1RetryQueueItem, bool, error) { - tx, commit, rollback, err := sqlchelpers.PrepareTx(ctx, r.pool, r.l, 5000) + tx, commit, rollback, err := sqlchelpers.PrepareTx(ctx, r.pool, r.l, 25000) if err != nil { return nil, false, err @@ -1186,8 +1192,7 @@ func (r *TaskRepositoryImpl) ProcessTaskRetryQueueItems(ctx context.Context, ten defer rollback() - // TODO: make limit configurable - limit := 10000 + limit := r.retryQueueLimit // get task reassignments res, err := r.queries.ProcessRetryQueueItems(ctx, tx, sqlcv1.ProcessRetryQueueItemsParams{ @@ -1215,7 +1220,7 @@ type durableSleepEventData struct { } func (r *TaskRepositoryImpl) ProcessDurableSleeps(ctx context.Context, tenantId string) (*EventMatchResults, bool, error) { - tx, commit, rollback, err := sqlchelpers.PrepareTx(ctx, r.pool, r.l, 5000) + tx, commit, rollback, err := sqlchelpers.PrepareTx(ctx, r.pool, r.l, 25000) if err != nil { return nil, false, err @@ -1223,7 +1228,7 @@ func (r *TaskRepositoryImpl) ProcessDurableSleeps(ctx context.Context, tenantId defer rollback() - limit := 1000 + limit := r.durableSleepLimit emitted, err := r.queries.PopDurableSleep(ctx, tx, sqlcv1.PopDurableSleepParams{ TenantID: sqlchelpers.UUIDFromStr(tenantId),