Tag more service contexts

This commit is contained in:
Taras Kushnir
2025-09-13 09:15:10 +03:00
parent bc983cdd95
commit f842dc97a4
4 changed files with 18 additions and 9 deletions

View File

@@ -15,6 +15,7 @@ import (
const (
userLimitTTL = 1 * time.Hour
userLimitRefresh = 3 * time.Hour
AuthService = "auth"
)
type UserLimiter interface {
@@ -131,13 +132,15 @@ func NewAuthMiddleware(store db.Implementor,
func (am *AuthMiddleware) StartBackfill(backfillDelay time.Duration) {
var sitekeyBackfillCtx context.Context
sitekeyBackfillBaseCtx := context.WithValue(context.Background(), common.ServiceContextKey, AuthService)
sitekeyBackfillCtx, am.SitekeyBackfillCancel = context.WithCancel(
context.WithValue(context.Background(), common.TraceIDContextKey, "sitekey_backfill"))
context.WithValue(sitekeyBackfillBaseCtx, common.TraceIDContextKey, "sitekey_backfill"))
go common.ProcessBatchMap(sitekeyBackfillCtx, am.SitekeyChan, backfillDelay, am.BatchSize, am.BatchSize*100, am.backfillSitekeyImpl)
var usersBackfillCtx context.Context
userBackfillBaseCtx := context.WithValue(context.Background(), common.ServiceContextKey, AuthService)
usersBackfillCtx, am.UsersBackfillCancel = context.WithCancel(
context.WithValue(context.Background(), common.TraceIDContextKey, "users_backfill"))
context.WithValue(userBackfillBaseCtx, common.TraceIDContextKey, "users_backfill"))
// NOTE: we use the same backfill delay because users processing is slower and sitekey channel will block on it
go common.ProcessBatchMap(usersBackfillCtx, am.UsersChan, backfillDelay, am.BatchSize, am.BatchSize*10, am.backfillUsersImpl)
}

View File

@@ -25,6 +25,7 @@ const (
PropertyBucketSize = 5 * time.Minute
updateLimitsBatchSize = 100
maxVerifyBatchSize = 100_000
apiService = "api"
)
var (
@@ -125,9 +126,9 @@ func (s *Server) Init(ctx context.Context, verifyFlushInterval, authBackfillDela
s.Levels.Init(2*time.Second /*access log interval*/, PropertyBucketSize /*backfill interval*/)
s.Auth.StartBackfill(authBackfillDelay)
baseVerifyCtx := context.WithValue(context.Background(), common.ServiceContextKey, apiService)
var cancelVerifyCtx context.Context
cancelVerifyCtx, s.VerifyLogCancel = context.WithCancel(
context.WithValue(context.Background(), common.TraceIDContextKey, "flush_verify_log"))
cancelVerifyCtx, s.VerifyLogCancel = context.WithCancel(context.WithValue(baseVerifyCtx, common.TraceIDContextKey, "flush_verify_log"))
go common.ProcessBatchArray(cancelVerifyCtx, s.VerifyLogChan, verifyFlushInterval, VerifyBatchSize, maxVerifyBatchSize, s.TimeSeries.WriteVerifyLogBatch)
@@ -148,7 +149,9 @@ func (s *Server) Setup(router *http.ServeMux, domain string, verbose bool, secur
}
if corsOpts.Debug {
corsOpts.Logger = &common.FmtLogger{Ctx: common.TraceContext(context.TODO(), "cors"), Level: common.LevelTrace}
ctx := common.TraceContext(context.TODO(), "cors")
ctx = context.WithValue(ctx, common.ServiceContextKey, apiService)
corsOpts.Logger = &common.FmtLogger{Ctx: ctx, Level: common.LevelTrace}
}
s.Cors = cors.New(corsOpts)
@@ -168,7 +171,7 @@ func (s *Server) Shutdown() {
func (s *Server) setupWithPrefix(domain string, router *http.ServeMux, corsHandler, security alice.Constructor) {
prefix := domain + "/"
slog.Debug("Setting up the API routes", "prefix", prefix)
svc := common.ServiceMiddleware("api")
svc := common.ServiceMiddleware(apiService)
publicChain := alice.New(svc, common.Recovered, security, s.Metrics.Handler)
// NOTE: auth middleware provides rate limiting internally
puzzleChain := publicChain.Append(s.RateLimiter.RateLimit, monitoring.Traced, common.TimeoutHandler(1*time.Second))

View File

@@ -47,7 +47,7 @@ func connectClickhouse(ctx context.Context, opts ClickHouseConnectOpts) *sql.DB
//},
Debug: opts.Verbose,
Debugf: func(format string, v ...any) {
slog.Log(ctx, common.LevelTrace, fmt.Sprintf(format, v...), "source", "clickhouse")
slog.Log(context.TODO(), common.LevelTrace, fmt.Sprintf(format, v...), common.TraceIDAttr("clickhouse"))
},
//BlockBufferSize: 10,
//MaxCompressionBuffer: 10240,

View File

@@ -96,13 +96,16 @@ func requestsToDifficulty(requests float64, minDifficulty uint8, level dbgen.Dif
func (levels *Levels) Init(accessLogInterval, backfillInterval time.Duration) {
const (
maxPendingBatchSize = 100_000
levelsService = "levels"
)
var accessCtx context.Context
accessBaseCtx := context.WithValue(context.Background(), common.ServiceContextKey, levelsService)
accessCtx, levels.accessLogCancel = context.WithCancel(
context.WithValue(context.Background(), common.TraceIDContextKey, "access_log"))
context.WithValue(accessBaseCtx, common.TraceIDContextKey, "access_log"))
go common.ProcessBatchArray(accessCtx, levels.accessChan, accessLogInterval, levels.batchSize, maxPendingBatchSize, levels.timeSeries.WriteAccessLogBatch)
go levels.backfillDifficulty(context.WithValue(context.Background(), common.TraceIDContextKey, "backfill_difficulty"),
difficultyBaseCtx := context.WithValue(context.Background(), common.ServiceContextKey, levelsService)
go levels.backfillDifficulty(context.WithValue(difficultyBaseCtx, common.TraceIDContextKey, "backfill_difficulty"),
backfillInterval)
}