diff --git a/cmd/server/main.go b/cmd/server/main.go index c470d492..05d92999 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -239,7 +239,8 @@ func run(ctx context.Context, cfg common.ConfigStore, stderr io.Writer, listener CheckInterval: cfg.Get(common.HealthCheckIntervalKey), Metrics: metrics, } - jobs := maintenance.NewJobs(businessDB) + jobConcurrency := config.AsInt(cfg.Get(common.MaintenanceJobConcurrencyKey), 2) + jobs := maintenance.NewJobs(businessDB, jobConcurrency) updateConfigFunc := func(ctx context.Context) { cfg.Update(ctx) diff --git a/pkg/common/config.go b/pkg/common/config.go index 4bb55b48..5fdf0bcc 100644 --- a/pkg/common/config.go +++ b/pkg/common/config.go @@ -45,6 +45,7 @@ const ( CountryCodeHeaderKey EnterpriseAuditLogDaysKey ClickHouseOptionalKey + MaintenanceJobConcurrencyKey // Add new fields _above_ COMMON_CONFIG_KEYS_COUNT ) diff --git a/pkg/config/env.go b/pkg/config/env.go index bb621881..a8e71067 100644 --- a/pkg/config/env.go +++ b/pkg/config/env.go @@ -77,6 +77,7 @@ func init() { configKeyToEnvName[common.CountryCodeHeaderKey] = "PC_COUNTRY_CODE_HEADER" configKeyToEnvName[common.EnterpriseAuditLogDaysKey] = "EE_AUDIT_LOGS_DAYS" configKeyToEnvName[common.ClickHouseOptionalKey] = "PC_CLICKHOUSE_OPTIONAL" + configKeyToEnvName[common.MaintenanceJobConcurrencyKey] = "PC_MAINTENANCE_JOB_CONCURRENCY" for i, v := range configKeyToEnvName { if len(v) == 0 { diff --git a/pkg/maintenance/jobs.go b/pkg/maintenance/jobs.go index e3221e8e..9d4c38c6 100644 --- a/pkg/maintenance/jobs.go +++ b/pkg/maintenance/jobs.go @@ -7,18 +7,22 @@ import ( "io" "log/slog" "net/http" - "sync" "time" "github.com/PrivateCaptcha/PrivateCaptcha/pkg/common" "github.com/PrivateCaptcha/PrivateCaptcha/pkg/db" + "golang.org/x/sync/semaphore" ) -func NewJobs(store db.Implementor) *jobs { +func NewJobs(store db.Implementor, concurrency int) *jobs { + if concurrency < 1 { + concurrency = 1 + } j := &jobs{ store: store, periodicJobs: make([]common.PeriodicJob, 0), oneOffJobs: make([]common.OneOffJob, 0), + sem: semaphore.NewWeighted(int64(concurrency)), } j.maintenanceCtx, j.maintenanceCancel = context.WithCancel( @@ -34,7 +38,7 @@ type jobs struct { maintenanceCancel context.CancelFunc maintenanceCtx context.Context apiKey string - mux sync.Mutex + sem *semaphore.Weighted } // Implicit logic is that lockDuration is the actual job Interval, but it is defined by the SQL lock. @@ -67,15 +71,15 @@ func (j *jobs) Spawn(job common.PeriodicJob) { func (j *jobs) RunAll() { slog.DebugContext(j.maintenanceCtx, "Starting maintenance jobs", "periodic", len(j.periodicJobs), "oneoff", len(j.oneOffJobs)) - // NOTE: we run jobs mutually exclusive to preserve resources for main server (those are _maintenance_ jobs anyways) + // NOTE: we limit concurrent jobs with semaphore to preserve resources for main server (those are _maintenance_ jobs anyways) // NOTE 2: this does not apply for on-demand ones below - that's why we wrap them only here, unlike AddLocked() for _, job := range j.periodicJobs { - go common.RunPeriodicJob(j.maintenanceCtx, &mutexPeriodicJob{job: job, mux: &j.mux}) + go common.RunPeriodicJob(j.maintenanceCtx, &semaphorePeriodicJob{job: job, sem: j.sem}) } for _, job := range j.oneOffJobs { - go common.RunOneOffJob(j.maintenanceCtx, &mutexOneOffJob{job: job, mux: &j.mux}, job.NewParams()) + go common.RunOneOffJob(j.maintenanceCtx, &semaphoreOneOffJob{job: job, sem: j.sem}, job.NewParams()) } } diff --git a/pkg/maintenance/jobs_test.go b/pkg/maintenance/jobs_test.go index 2be6ed02..ccf249f2 100644 --- a/pkg/maintenance/jobs_test.go +++ b/pkg/maintenance/jobs_test.go @@ -80,7 +80,7 @@ func (j *stubPeriodicJob) wasExecuted() bool { } func TestOneOffJobExecution(t *testing.T) { - jobsManager := NewJobs(nil) + jobsManager := NewJobs(nil, 2) defer jobsManager.Shutdown() stubJob := &stubOneOffJob{} @@ -97,7 +97,7 @@ func TestOneOffJobExecution(t *testing.T) { } func TestPeriodicJobExecution(t *testing.T) { - jobsManager := NewJobs(nil) + jobsManager := NewJobs(nil, 2) defer jobsManager.Shutdown() stubJob := &stubPeriodicJob{ @@ -116,7 +116,7 @@ func TestPeriodicJobExecution(t *testing.T) { } func TestJobsSetup(t *testing.T) { - jobsManager := NewJobs(nil) + jobsManager := NewJobs(nil, 2) defer jobsManager.Shutdown() mux := http.NewServeMux() @@ -131,7 +131,7 @@ func TestJobsSetup(t *testing.T) { } func TestHandlePeriodicJobWithAPIKey(t *testing.T) { - jobsManager := NewJobs(nil) + jobsManager := NewJobs(nil, 2) defer jobsManager.Shutdown() stubJob := &stubPeriodicJob{ @@ -159,7 +159,7 @@ func TestHandlePeriodicJobWithAPIKey(t *testing.T) { } func TestHandlePeriodicJobNoAPIKey(t *testing.T) { - jobsManager := NewJobs(nil) + jobsManager := NewJobs(nil, 2) defer jobsManager.Shutdown() mux := http.NewServeMux() @@ -177,7 +177,7 @@ func TestHandlePeriodicJobNoAPIKey(t *testing.T) { } func TestHandlePeriodicJobWrongAPIKey(t *testing.T) { - jobsManager := NewJobs(nil) + jobsManager := NewJobs(nil, 2) defer jobsManager.Shutdown() mux := http.NewServeMux() @@ -196,7 +196,7 @@ func TestHandlePeriodicJobWrongAPIKey(t *testing.T) { } func TestHandlePeriodicJobNotFound(t *testing.T) { - jobsManager := NewJobs(nil) + jobsManager := NewJobs(nil, 2) defer jobsManager.Shutdown() mux := http.NewServeMux() @@ -215,7 +215,7 @@ func TestHandlePeriodicJobNotFound(t *testing.T) { } func TestHandleOneOffJobWithAPIKey(t *testing.T) { - jobsManager := NewJobs(nil) + jobsManager := NewJobs(nil, 2) defer jobsManager.Shutdown() stubJob := &stubOneOffJob{} @@ -241,7 +241,7 @@ func TestHandleOneOffJobWithAPIKey(t *testing.T) { } func TestHandleOneOffJobNotFound(t *testing.T) { - jobsManager := NewJobs(nil) + jobsManager := NewJobs(nil, 2) defer jobsManager.Shutdown() mux := http.NewServeMux() @@ -260,7 +260,7 @@ func TestHandleOneOffJobNotFound(t *testing.T) { } func TestSecurityMiddlewareNoConfiguredKey(t *testing.T) { - jobsManager := NewJobs(nil) + jobsManager := NewJobs(nil, 2) defer jobsManager.Shutdown() mux := http.NewServeMux() @@ -279,7 +279,7 @@ func TestSecurityMiddlewareNoConfiguredKey(t *testing.T) { } func TestJobsUpdateConfig(t *testing.T) { - jobsManager := NewJobs(nil) + jobsManager := NewJobs(nil, 2) defer jobsManager.Shutdown() cfg := config.NewBaseConfig(nil) @@ -293,7 +293,7 @@ func TestJobsUpdateConfig(t *testing.T) { } func TestJobsSpawn(t *testing.T) { - jobsManager := NewJobs(nil) + jobsManager := NewJobs(nil, 2) defer jobsManager.Shutdown() stubJob := &stubPeriodicJob{ diff --git a/pkg/maintenance/mutexjob.go b/pkg/maintenance/mutexjob.go deleted file mode 100644 index 021cb563..00000000 --- a/pkg/maintenance/mutexjob.go +++ /dev/null @@ -1,53 +0,0 @@ -package maintenance - -import ( - "context" - "log/slog" - "sync" - "time" - - "github.com/PrivateCaptcha/PrivateCaptcha/pkg/common" -) - -type mutexPeriodicJob struct { - job common.PeriodicJob - mux *sync.Mutex -} - -var _ common.PeriodicJob = (*mutexPeriodicJob)(nil) - -func (j *mutexPeriodicJob) Interval() time.Duration { return j.job.Interval() } -func (j *mutexPeriodicJob) Jitter() time.Duration { return j.job.Jitter() } -func (j *mutexPeriodicJob) Name() string { return j.job.Name() } -func (j *mutexPeriodicJob) NewParams() any { return j.job.NewParams() } -func (j *mutexPeriodicJob) Trigger() <-chan struct{} { return j.job.Trigger() } -func (j *mutexPeriodicJob) Timeout() time.Duration { return j.job.Timeout() } - -func (j *mutexPeriodicJob) RunOnce(ctx context.Context, params any) error { - slog.DebugContext(ctx, "About to acquire maintenance job mutex", "job", j.Name()) - - j.mux.Lock() - defer j.mux.Unlock() - - return j.job.RunOnce(ctx, params) -} - -type mutexOneOffJob struct { - job common.OneOffJob - mux *sync.Mutex -} - -var _ common.OneOffJob = (*mutexOneOffJob)(nil) - -func (j *mutexOneOffJob) Name() string { return j.job.Name() } -func (j *mutexOneOffJob) InitialPause() time.Duration { return j.job.InitialPause() } -func (j *mutexOneOffJob) NewParams() any { return j.job.NewParams() } - -func (j *mutexOneOffJob) RunOnce(ctx context.Context, params any) error { - slog.DebugContext(ctx, "About to acquire maintenance job mutex", "job", j.Name()) - - j.mux.Lock() - defer j.mux.Unlock() - - return j.job.RunOnce(ctx, params) -} diff --git a/pkg/maintenance/semaphorejob.go b/pkg/maintenance/semaphorejob.go new file mode 100644 index 00000000..6ae38aba --- /dev/null +++ b/pkg/maintenance/semaphorejob.go @@ -0,0 +1,57 @@ +package maintenance + +import ( + "context" + "log/slog" + "time" + + "github.com/PrivateCaptcha/PrivateCaptcha/pkg/common" + "golang.org/x/sync/semaphore" +) + +type semaphorePeriodicJob struct { + job common.PeriodicJob + sem *semaphore.Weighted +} + +var _ common.PeriodicJob = (*semaphorePeriodicJob)(nil) + +func (j *semaphorePeriodicJob) Interval() time.Duration { return j.job.Interval() } +func (j *semaphorePeriodicJob) Jitter() time.Duration { return j.job.Jitter() } +func (j *semaphorePeriodicJob) Name() string { return j.job.Name() } +func (j *semaphorePeriodicJob) NewParams() any { return j.job.NewParams() } +func (j *semaphorePeriodicJob) Trigger() <-chan struct{} { return j.job.Trigger() } +func (j *semaphorePeriodicJob) Timeout() time.Duration { return j.job.Timeout() } + +func (j *semaphorePeriodicJob) RunOnce(ctx context.Context, params any) error { + slog.DebugContext(ctx, "About to acquire maintenance job semaphore", "job", j.Name()) + + if err := j.sem.Acquire(ctx, 1); err != nil { + return err + } + defer j.sem.Release(1) + + return j.job.RunOnce(ctx, params) +} + +type semaphoreOneOffJob struct { + job common.OneOffJob + sem *semaphore.Weighted +} + +var _ common.OneOffJob = (*semaphoreOneOffJob)(nil) + +func (j *semaphoreOneOffJob) Name() string { return j.job.Name() } +func (j *semaphoreOneOffJob) InitialPause() time.Duration { return j.job.InitialPause() } +func (j *semaphoreOneOffJob) NewParams() any { return j.job.NewParams() } + +func (j *semaphoreOneOffJob) RunOnce(ctx context.Context, params any) error { + slog.DebugContext(ctx, "About to acquire maintenance job semaphore", "job", j.Name()) + + if err := j.sem.Acquire(ctx, 1); err != nil { + return err + } + defer j.sem.Release(1) + + return j.job.RunOnce(ctx, params) +}