mirror of
https://github.com/PrivateCaptcha/PrivateCaptcha.git
synced 2026-02-08 23:09:11 -06:00
Replace mutex with semaphore for maintenance job concurrency control (#275)
Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: ribtoks <505555+ribtoks@users.noreply.github.com>
This commit is contained in:
@@ -239,7 +239,8 @@ func run(ctx context.Context, cfg common.ConfigStore, stderr io.Writer, listener
|
||||
CheckInterval: cfg.Get(common.HealthCheckIntervalKey),
|
||||
Metrics: metrics,
|
||||
}
|
||||
jobs := maintenance.NewJobs(businessDB)
|
||||
jobConcurrency := config.AsInt(cfg.Get(common.MaintenanceJobConcurrencyKey), 2)
|
||||
jobs := maintenance.NewJobs(businessDB, jobConcurrency)
|
||||
|
||||
updateConfigFunc := func(ctx context.Context) {
|
||||
cfg.Update(ctx)
|
||||
|
||||
@@ -45,6 +45,7 @@ const (
|
||||
CountryCodeHeaderKey
|
||||
EnterpriseAuditLogDaysKey
|
||||
ClickHouseOptionalKey
|
||||
MaintenanceJobConcurrencyKey
|
||||
// Add new fields _above_
|
||||
COMMON_CONFIG_KEYS_COUNT
|
||||
)
|
||||
|
||||
@@ -77,6 +77,7 @@ func init() {
|
||||
configKeyToEnvName[common.CountryCodeHeaderKey] = "PC_COUNTRY_CODE_HEADER"
|
||||
configKeyToEnvName[common.EnterpriseAuditLogDaysKey] = "EE_AUDIT_LOGS_DAYS"
|
||||
configKeyToEnvName[common.ClickHouseOptionalKey] = "PC_CLICKHOUSE_OPTIONAL"
|
||||
configKeyToEnvName[common.MaintenanceJobConcurrencyKey] = "PC_MAINTENANCE_JOB_CONCURRENCY"
|
||||
|
||||
for i, v := range configKeyToEnvName {
|
||||
if len(v) == 0 {
|
||||
|
||||
@@ -7,18 +7,22 @@ import (
|
||||
"io"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/PrivateCaptcha/PrivateCaptcha/pkg/common"
|
||||
"github.com/PrivateCaptcha/PrivateCaptcha/pkg/db"
|
||||
"golang.org/x/sync/semaphore"
|
||||
)
|
||||
|
||||
func NewJobs(store db.Implementor) *jobs {
|
||||
func NewJobs(store db.Implementor, concurrency int) *jobs {
|
||||
if concurrency < 1 {
|
||||
concurrency = 1
|
||||
}
|
||||
j := &jobs{
|
||||
store: store,
|
||||
periodicJobs: make([]common.PeriodicJob, 0),
|
||||
oneOffJobs: make([]common.OneOffJob, 0),
|
||||
sem: semaphore.NewWeighted(int64(concurrency)),
|
||||
}
|
||||
|
||||
j.maintenanceCtx, j.maintenanceCancel = context.WithCancel(
|
||||
@@ -34,7 +38,7 @@ type jobs struct {
|
||||
maintenanceCancel context.CancelFunc
|
||||
maintenanceCtx context.Context
|
||||
apiKey string
|
||||
mux sync.Mutex
|
||||
sem *semaphore.Weighted
|
||||
}
|
||||
|
||||
// Implicit logic is that lockDuration is the actual job Interval, but it is defined by the SQL lock.
|
||||
@@ -67,15 +71,15 @@ func (j *jobs) Spawn(job common.PeriodicJob) {
|
||||
func (j *jobs) RunAll() {
|
||||
slog.DebugContext(j.maintenanceCtx, "Starting maintenance jobs", "periodic", len(j.periodicJobs), "oneoff", len(j.oneOffJobs))
|
||||
|
||||
// NOTE: we run jobs mutually exclusive to preserve resources for main server (those are _maintenance_ jobs anyways)
|
||||
// NOTE: we limit concurrent jobs with semaphore to preserve resources for main server (those are _maintenance_ jobs anyways)
|
||||
// NOTE 2: this does not apply for on-demand ones below - that's why we wrap them only here, unlike AddLocked()
|
||||
|
||||
for _, job := range j.periodicJobs {
|
||||
go common.RunPeriodicJob(j.maintenanceCtx, &mutexPeriodicJob{job: job, mux: &j.mux})
|
||||
go common.RunPeriodicJob(j.maintenanceCtx, &semaphorePeriodicJob{job: job, sem: j.sem})
|
||||
}
|
||||
|
||||
for _, job := range j.oneOffJobs {
|
||||
go common.RunOneOffJob(j.maintenanceCtx, &mutexOneOffJob{job: job, mux: &j.mux}, job.NewParams())
|
||||
go common.RunOneOffJob(j.maintenanceCtx, &semaphoreOneOffJob{job: job, sem: j.sem}, job.NewParams())
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -80,7 +80,7 @@ func (j *stubPeriodicJob) wasExecuted() bool {
|
||||
}
|
||||
|
||||
func TestOneOffJobExecution(t *testing.T) {
|
||||
jobsManager := NewJobs(nil)
|
||||
jobsManager := NewJobs(nil, 2)
|
||||
defer jobsManager.Shutdown()
|
||||
|
||||
stubJob := &stubOneOffJob{}
|
||||
@@ -97,7 +97,7 @@ func TestOneOffJobExecution(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestPeriodicJobExecution(t *testing.T) {
|
||||
jobsManager := NewJobs(nil)
|
||||
jobsManager := NewJobs(nil, 2)
|
||||
defer jobsManager.Shutdown()
|
||||
|
||||
stubJob := &stubPeriodicJob{
|
||||
@@ -116,7 +116,7 @@ func TestPeriodicJobExecution(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestJobsSetup(t *testing.T) {
|
||||
jobsManager := NewJobs(nil)
|
||||
jobsManager := NewJobs(nil, 2)
|
||||
defer jobsManager.Shutdown()
|
||||
|
||||
mux := http.NewServeMux()
|
||||
@@ -131,7 +131,7 @@ func TestJobsSetup(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestHandlePeriodicJobWithAPIKey(t *testing.T) {
|
||||
jobsManager := NewJobs(nil)
|
||||
jobsManager := NewJobs(nil, 2)
|
||||
defer jobsManager.Shutdown()
|
||||
|
||||
stubJob := &stubPeriodicJob{
|
||||
@@ -159,7 +159,7 @@ func TestHandlePeriodicJobWithAPIKey(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestHandlePeriodicJobNoAPIKey(t *testing.T) {
|
||||
jobsManager := NewJobs(nil)
|
||||
jobsManager := NewJobs(nil, 2)
|
||||
defer jobsManager.Shutdown()
|
||||
|
||||
mux := http.NewServeMux()
|
||||
@@ -177,7 +177,7 @@ func TestHandlePeriodicJobNoAPIKey(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestHandlePeriodicJobWrongAPIKey(t *testing.T) {
|
||||
jobsManager := NewJobs(nil)
|
||||
jobsManager := NewJobs(nil, 2)
|
||||
defer jobsManager.Shutdown()
|
||||
|
||||
mux := http.NewServeMux()
|
||||
@@ -196,7 +196,7 @@ func TestHandlePeriodicJobWrongAPIKey(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestHandlePeriodicJobNotFound(t *testing.T) {
|
||||
jobsManager := NewJobs(nil)
|
||||
jobsManager := NewJobs(nil, 2)
|
||||
defer jobsManager.Shutdown()
|
||||
|
||||
mux := http.NewServeMux()
|
||||
@@ -215,7 +215,7 @@ func TestHandlePeriodicJobNotFound(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestHandleOneOffJobWithAPIKey(t *testing.T) {
|
||||
jobsManager := NewJobs(nil)
|
||||
jobsManager := NewJobs(nil, 2)
|
||||
defer jobsManager.Shutdown()
|
||||
|
||||
stubJob := &stubOneOffJob{}
|
||||
@@ -241,7 +241,7 @@ func TestHandleOneOffJobWithAPIKey(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestHandleOneOffJobNotFound(t *testing.T) {
|
||||
jobsManager := NewJobs(nil)
|
||||
jobsManager := NewJobs(nil, 2)
|
||||
defer jobsManager.Shutdown()
|
||||
|
||||
mux := http.NewServeMux()
|
||||
@@ -260,7 +260,7 @@ func TestHandleOneOffJobNotFound(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestSecurityMiddlewareNoConfiguredKey(t *testing.T) {
|
||||
jobsManager := NewJobs(nil)
|
||||
jobsManager := NewJobs(nil, 2)
|
||||
defer jobsManager.Shutdown()
|
||||
|
||||
mux := http.NewServeMux()
|
||||
@@ -279,7 +279,7 @@ func TestSecurityMiddlewareNoConfiguredKey(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestJobsUpdateConfig(t *testing.T) {
|
||||
jobsManager := NewJobs(nil)
|
||||
jobsManager := NewJobs(nil, 2)
|
||||
defer jobsManager.Shutdown()
|
||||
|
||||
cfg := config.NewBaseConfig(nil)
|
||||
@@ -293,7 +293,7 @@ func TestJobsUpdateConfig(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestJobsSpawn(t *testing.T) {
|
||||
jobsManager := NewJobs(nil)
|
||||
jobsManager := NewJobs(nil, 2)
|
||||
defer jobsManager.Shutdown()
|
||||
|
||||
stubJob := &stubPeriodicJob{
|
||||
|
||||
@@ -1,53 +0,0 @@
|
||||
package maintenance
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log/slog"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/PrivateCaptcha/PrivateCaptcha/pkg/common"
|
||||
)
|
||||
|
||||
type mutexPeriodicJob struct {
|
||||
job common.PeriodicJob
|
||||
mux *sync.Mutex
|
||||
}
|
||||
|
||||
var _ common.PeriodicJob = (*mutexPeriodicJob)(nil)
|
||||
|
||||
func (j *mutexPeriodicJob) Interval() time.Duration { return j.job.Interval() }
|
||||
func (j *mutexPeriodicJob) Jitter() time.Duration { return j.job.Jitter() }
|
||||
func (j *mutexPeriodicJob) Name() string { return j.job.Name() }
|
||||
func (j *mutexPeriodicJob) NewParams() any { return j.job.NewParams() }
|
||||
func (j *mutexPeriodicJob) Trigger() <-chan struct{} { return j.job.Trigger() }
|
||||
func (j *mutexPeriodicJob) Timeout() time.Duration { return j.job.Timeout() }
|
||||
|
||||
func (j *mutexPeriodicJob) RunOnce(ctx context.Context, params any) error {
|
||||
slog.DebugContext(ctx, "About to acquire maintenance job mutex", "job", j.Name())
|
||||
|
||||
j.mux.Lock()
|
||||
defer j.mux.Unlock()
|
||||
|
||||
return j.job.RunOnce(ctx, params)
|
||||
}
|
||||
|
||||
type mutexOneOffJob struct {
|
||||
job common.OneOffJob
|
||||
mux *sync.Mutex
|
||||
}
|
||||
|
||||
var _ common.OneOffJob = (*mutexOneOffJob)(nil)
|
||||
|
||||
func (j *mutexOneOffJob) Name() string { return j.job.Name() }
|
||||
func (j *mutexOneOffJob) InitialPause() time.Duration { return j.job.InitialPause() }
|
||||
func (j *mutexOneOffJob) NewParams() any { return j.job.NewParams() }
|
||||
|
||||
func (j *mutexOneOffJob) RunOnce(ctx context.Context, params any) error {
|
||||
slog.DebugContext(ctx, "About to acquire maintenance job mutex", "job", j.Name())
|
||||
|
||||
j.mux.Lock()
|
||||
defer j.mux.Unlock()
|
||||
|
||||
return j.job.RunOnce(ctx, params)
|
||||
}
|
||||
57
pkg/maintenance/semaphorejob.go
Normal file
57
pkg/maintenance/semaphorejob.go
Normal file
@@ -0,0 +1,57 @@
|
||||
package maintenance
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log/slog"
|
||||
"time"
|
||||
|
||||
"github.com/PrivateCaptcha/PrivateCaptcha/pkg/common"
|
||||
"golang.org/x/sync/semaphore"
|
||||
)
|
||||
|
||||
type semaphorePeriodicJob struct {
|
||||
job common.PeriodicJob
|
||||
sem *semaphore.Weighted
|
||||
}
|
||||
|
||||
var _ common.PeriodicJob = (*semaphorePeriodicJob)(nil)
|
||||
|
||||
func (j *semaphorePeriodicJob) Interval() time.Duration { return j.job.Interval() }
|
||||
func (j *semaphorePeriodicJob) Jitter() time.Duration { return j.job.Jitter() }
|
||||
func (j *semaphorePeriodicJob) Name() string { return j.job.Name() }
|
||||
func (j *semaphorePeriodicJob) NewParams() any { return j.job.NewParams() }
|
||||
func (j *semaphorePeriodicJob) Trigger() <-chan struct{} { return j.job.Trigger() }
|
||||
func (j *semaphorePeriodicJob) Timeout() time.Duration { return j.job.Timeout() }
|
||||
|
||||
func (j *semaphorePeriodicJob) RunOnce(ctx context.Context, params any) error {
|
||||
slog.DebugContext(ctx, "About to acquire maintenance job semaphore", "job", j.Name())
|
||||
|
||||
if err := j.sem.Acquire(ctx, 1); err != nil {
|
||||
return err
|
||||
}
|
||||
defer j.sem.Release(1)
|
||||
|
||||
return j.job.RunOnce(ctx, params)
|
||||
}
|
||||
|
||||
type semaphoreOneOffJob struct {
|
||||
job common.OneOffJob
|
||||
sem *semaphore.Weighted
|
||||
}
|
||||
|
||||
var _ common.OneOffJob = (*semaphoreOneOffJob)(nil)
|
||||
|
||||
func (j *semaphoreOneOffJob) Name() string { return j.job.Name() }
|
||||
func (j *semaphoreOneOffJob) InitialPause() time.Duration { return j.job.InitialPause() }
|
||||
func (j *semaphoreOneOffJob) NewParams() any { return j.job.NewParams() }
|
||||
|
||||
func (j *semaphoreOneOffJob) RunOnce(ctx context.Context, params any) error {
|
||||
slog.DebugContext(ctx, "About to acquire maintenance job semaphore", "job", j.Name())
|
||||
|
||||
if err := j.sem.Acquire(ctx, 1); err != nil {
|
||||
return err
|
||||
}
|
||||
defer j.sem.Release(1)
|
||||
|
||||
return j.job.RunOnce(ctx, params)
|
||||
}
|
||||
Reference in New Issue
Block a user