groupware: implement metrics

* implement a framework for metrics, with a few exemplary ones
This commit is contained in:
Pascal Bleser
2025-08-26 22:11:02 +02:00
parent 780e125621
commit ad9387119b
9 changed files with 263 additions and 34 deletions

View File

@@ -7,7 +7,6 @@ import (
"github.com/oklog/run"
"github.com/opencloud-eu/opencloud/pkg/config/configlog"
"github.com/opencloud-eu/opencloud/pkg/tracing"
"github.com/opencloud-eu/opencloud/pkg/version"
"github.com/opencloud-eu/opencloud/services/groupware/pkg/config"
"github.com/opencloud-eu/opencloud/services/groupware/pkg/config/parser"
"github.com/opencloud-eu/opencloud/services/groupware/pkg/logging"
@@ -37,13 +36,11 @@ func Server(cfg *config.Config) *cli.Command {
var (
gr = run.Group{}
ctx, cancel = context.WithCancel(c.Context)
m = metrics.New()
m = metrics.NewHttpMetrics()
)
defer cancel()
m.BuildInfo.WithLabelValues(version.GetString()).Set(1)
server, err := debug.Server(
debug.Logger(logger),
debug.Config(cfg),

View File

@@ -9,8 +9,11 @@ import (
"github.com/go-chi/chi/v5"
"github.com/prometheus/client_golang/prometheus"
"github.com/opencloud-eu/opencloud/pkg/jmap"
"github.com/opencloud-eu/opencloud/pkg/log"
"github.com/opencloud-eu/opencloud/services/groupware/pkg/metrics"
)
// When the request succeeds without a "since" query parameter.
@@ -574,14 +577,23 @@ func (g *Groupware) RelatedToMessage(w http.ResponseWriter, r *http.Request) {
reqId := req.GetRequestId()
accountId := req.GetAccountId()
logger := log.From(req.logger.With().Str(logEmailId, log.SafeString(id)))
getEmailsBefore := time.Now()
emails, sessionState, jerr := g.jmap.GetEmails(accountId, req.session, req.ctx, logger, []string{id}, true, g.maxBodyValueBytes)
getEmailsDuration := time.Since(getEmailsBefore)
if jerr != nil {
return req.errorResponseFromJmap(jerr)
}
if len(emails.Emails) < 1 {
g.metrics.EmailByIdDuration.WithLabelValues(req.session.ApiUrl, metrics.ResultNotFound).Observe(getEmailsDuration.Seconds())
logger.Trace().Msg("failed to find any emails matching id") // the id is already in the log field
return notFoundResponse(sessionState)
}
g.metrics.EmailByIdDuration.
WithLabelValues(req.session.ApiUrl, metrics.ResultNotFound).(prometheus.ExemplarObserver).
ObserveWithExemplar(getEmailsDuration.Seconds(), prometheus.Labels{
metrics.Labels.RequestId: reqId,
})
email := emails.Emails[0]
beacon := email.ReceivedAt // TODO configurable: either relative to when the email was received, or relative to now

View File

@@ -19,11 +19,15 @@ import (
"github.com/r3labs/sse/v2"
"github.com/rs/zerolog"
"github.com/prometheus/client_golang/prometheus"
"github.com/jellydator/ttlcache/v3"
"github.com/opencloud-eu/opencloud/pkg/jmap"
"github.com/opencloud-eu/opencloud/pkg/log"
"github.com/opencloud-eu/opencloud/services/groupware/pkg/config"
"github.com/opencloud-eu/opencloud/services/groupware/pkg/metrics"
)
const (
@@ -40,6 +44,11 @@ const (
logFolderId = "folder-id"
logQuery = "query"
logEmailId = "email-id"
logJobDescription = "job"
logJobId = "job-id"
logStreamId = "stream-id"
logPath = "path"
logMethod = "method"
)
type User interface {
@@ -61,6 +70,7 @@ type Job struct {
type Groupware struct {
mux *chi.Mux
metrics *metrics.Metrics
sseServer *sse.Server
streams map[string]time.Time
streamsLock sync.Mutex
@@ -94,12 +104,16 @@ func (e GroupwareInitializationError) Unwrap() error {
type GroupwareSessionEventListener struct {
logger *log.Logger
sessionCache *ttlcache.Cache[string, cachedSession]
counter prometheus.Counter
}
func (l GroupwareSessionEventListener) OnSessionOutdated(session *jmap.Session, newSessionState jmap.SessionState) {
// it's enough to remove the session from the cache, as it will be fetched on-demand
// the next time an operation is performed on behalf of the user
l.sessionCache.Delete(session.Username)
if l.counter != nil {
l.counter.Inc()
}
l.logger.Trace().Msgf("removed outdated session for user '%v': state %v -> %v", session.Username, session.State, newSessionState)
}
@@ -112,6 +126,35 @@ type Event struct {
Body any
}
type ConstMetricCollector struct {
metric prometheus.Metric
}
func (c ConstMetricCollector) Describe(ch chan<- *prometheus.Desc) {
ch <- c.metric.Desc()
}
func (c ConstMetricCollector) Collect(ch chan<- prometheus.Metric) {
ch <- c.metric
}
type SessionCacheMetricsCollector struct {
desc *prometheus.Desc
supply func() ttlcache.Metrics
}
func (s SessionCacheMetricsCollector) Describe(ch chan<- *prometheus.Desc) {
ch <- s.desc
}
func (s SessionCacheMetricsCollector) Collect(ch chan<- prometheus.Metric) {
m := s.supply()
ch <- prometheus.MustNewConstMetric(s.desc, prometheus.GaugeValue, float64(m.Evictions), metrics.SessionCacheTypeEvictions)
ch <- prometheus.MustNewConstMetric(s.desc, prometheus.GaugeValue, float64(m.Insertions), metrics.SessionCacheTypeInsertions)
ch <- prometheus.MustNewConstMetric(s.desc, prometheus.GaugeValue, float64(m.Hits), metrics.SessionCacheTypeHits)
ch <- prometheus.MustNewConstMetric(s.desc, prometheus.GaugeValue, float64(m.Misses), metrics.SessionCacheTypeMisses)
}
var _ prometheus.Collector = SessionCacheMetricsCollector{}
func NewGroupware(config *config.Config, logger *log.Logger, mux *chi.Mux) (*Groupware, error) {
baseUrl, err := url.Parse(config.Mail.BaseUrl)
if err != nil {
@@ -130,6 +173,8 @@ func NewGroupware(config *config.Config, logger *log.Logger, mux *chi.Mux) (*Gro
return nil, GroupwareInitializationError{Message: "Mail.Master.Password is empty"}
}
m := metrics.New()
defaultEmailLimit := max(config.Mail.DefaultEmailLimit, 0)
maxBodyValueBytes := max(config.Mail.MaxBodyValueBytes, 0)
responseHeaderTimeout := max(config.Mail.ResponseHeaderTimeout, 0)
@@ -137,7 +182,12 @@ func NewGroupware(config *config.Config, logger *log.Logger, mux *chi.Mux) (*Gro
sessionCacheTtl := max(config.Mail.SessionCache.Ttl, 0)
sessionFailureCacheTtl := max(config.Mail.SessionCache.FailureTtl, 0)
keepStreamsAlive := true // TODO configuration
eventChannelSize := 100 // TODO make channel queue buffering size configurable
workerQueueSize := 100 // TODO configuration setting
workerPoolSize := 10 // TODO configuration setting
keepStreamsAliveInterval := time.Duration(30) * time.Second // TODO configuration, make it 0 to disable keepalive
sseEventTtl := time.Duration(5) * time.Minute // TODO configuration setting
tr := http.DefaultTransport.(*http.Transport).Clone()
tr.ResponseHeaderTimeout = responseHeaderTimeout
@@ -172,6 +222,8 @@ func NewGroupware(config *config.Config, logger *log.Logger, mux *chi.Mux) (*Gro
ttlcache.WithLoader(sessionLoader),
)
go sessionCache.Start()
prometheus.Register(SessionCacheMetricsCollector{desc: m.SessionCacheDesc, supply: sessionCache.Metrics})
}
if logger.Trace().Enabled() {
@@ -196,20 +248,113 @@ func NewGroupware(config *config.Config, logger *log.Logger, mux *chi.Mux) (*Gro
})
}
sessionEventListener := GroupwareSessionEventListener{sessionCache: sessionCache, logger: logger}
sessionEventListener := GroupwareSessionEventListener{
sessionCache: sessionCache,
logger: logger,
counter: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: metrics.Namespace,
Subsystem: metrics.Subsystem,
Name: "outdated_sessions",
Help: "Counts outdated session events",
}),
}
jmapClient.AddSessionEventListener(&sessionEventListener)
eventChannel := make(chan Event, 100) // TODO make channel queue buffering size configurable
eventChannel := make(chan Event, eventChannelSize)
{
totalWorkerBufferMetric, err := prometheus.NewConstMetric(prometheus.NewDesc(
prometheus.BuildFQName(metrics.Namespace, metrics.Subsystem, "event_buffer_size"),
"Size of the buffer channel for server-sent events to process",
nil,
nil,
), prometheus.GaugeValue, float64(eventChannelSize))
if err != nil {
logger.Warn().Err(err).Msg("failed to create event_buffer_size metric")
} else {
prometheus.MustRegister(ConstMetricCollector{metric: totalWorkerBufferMetric})
}
prometheus.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Namespace: metrics.Namespace,
Subsystem: metrics.Subsystem,
Name: "event_buffer_queued",
Help: "Number of queued server-sent events",
}, func() float64 {
return float64(len(eventChannel))
}))
}
sseServer := sse.New()
sseServer.EventTTL = time.Duration(5) * time.Minute // TODO configuration setting
sseServer.EventTTL = sseEventTtl
{
var sseSubscribers atomic.Int32
sseServer.OnSubscribe = func(streamID string, sub *sse.Subscriber) {
sseSubscribers.Add(1)
}
sseServer.OnUnsubscribe = func(streamID string, sub *sse.Subscriber) {
sseSubscribers.Add(-1)
}
prometheus.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Namespace: metrics.Namespace,
Subsystem: metrics.Subsystem,
Name: "sse_subscribers",
Help: "Number of subscribers for server-sent event streams",
}, func() float64 {
return float64(sseSubscribers.Load())
}))
}
workerQueueSize := 100 // TODO configuration setting
workerPoolSize := 10 // TODO configuration setting
jobsChannel := make(chan Job, workerQueueSize)
{
totalWorkerBufferMetric, err := prometheus.NewConstMetric(prometheus.NewDesc(
prometheus.BuildFQName(metrics.Namespace, metrics.Subsystem, "workers_buffer_size"),
"Size of the buffer channel for background worker jobs",
nil,
nil,
), prometheus.GaugeValue, float64(workerQueueSize))
if err != nil {
logger.Warn().Err(err).Msg("failed to create workers_buffer_size metric")
} else {
prometheus.MustRegister(ConstMetricCollector{metric: totalWorkerBufferMetric})
}
prometheus.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Namespace: metrics.Namespace,
Subsystem: metrics.Subsystem,
Name: "workers_buffer_queued",
Help: "Number of queued background jobs",
}, func() float64 {
return float64(len(jobsChannel))
}))
}
var busyWorkers atomic.Int32
{
totalWorkersMetric, err := prometheus.NewConstMetric(prometheus.NewDesc(
prometheus.BuildFQName(metrics.Namespace, metrics.Subsystem, "workers_total"),
"Total amount of background job workers",
nil,
nil,
), prometheus.GaugeValue, float64(workerPoolSize))
if err != nil {
logger.Warn().Err(err).Msg("failed to create workers_total metric")
} else {
prometheus.MustRegister(ConstMetricCollector{metric: totalWorkersMetric})
}
prometheus.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Namespace: metrics.Namespace,
Subsystem: metrics.Subsystem,
Name: "workers_busy",
Help: "Number of background job workers that are currently busy executing jobs",
}, func() float64 {
return float64(busyWorkers.Load())
}))
}
g := &Groupware{
mux: mux,
metrics: m,
sseServer: sseServer,
streams: map[string]time.Time{},
streamsLock: sync.Mutex{},
@@ -225,11 +370,11 @@ func NewGroupware(config *config.Config, logger *log.Logger, mux *chi.Mux) (*Gro
}
for w := 1; w <= workerPoolSize; w++ {
go g.worker(jobsChannel)
go g.worker(jobsChannel, &busyWorkers)
}
if keepStreamsAlive {
ticker := time.NewTicker(time.Duration(30) * time.Second) // TODO configuration
if keepStreamsAliveInterval != 0 {
ticker := time.NewTicker(keepStreamsAliveInterval)
//defer ticker.Stop()
go func() {
for range ticker.C {
@@ -243,12 +388,16 @@ func NewGroupware(config *config.Config, logger *log.Logger, mux *chi.Mux) (*Gro
return g, nil
}
func (g *Groupware) worker(jobs <-chan Job) {
func (g *Groupware) worker(jobs <-chan Job, busy *atomic.Int32) {
for job := range jobs {
busy.Add(1)
before := time.Now()
logger := log.From(job.logger.With().Str("job", job.description).Uint64("job-id", job.id))
logger := log.From(job.logger.With().Str(logJobDescription, job.description).Uint64(logJobId, job.id))
job.job(job.id, logger)
logger.Trace().Msgf("finished job %d [%s] in %v", job.id, job.description, time.Since(before)) // TODO remove
if logger.Trace().Enabled() {
logger.Trace().Msgf("finished job %d [%s] in %v", job.id, job.description, time.Since(before)) // TODO remove
}
busy.Add(-1)
}
}
@@ -269,7 +418,7 @@ func (g *Groupware) listenForEvents() {
Data: data,
})
if !published && g.logger.Debug().Enabled() {
g.logger.Debug().Str("stream", log.SafeString(ev.Stream)).Msgf("dropped SSE event") // TODO more details
g.logger.Debug().Str(logStreamId, log.SafeString(ev.Stream)).Msgf("dropped SSE event") // TODO more details
}
} else {
g.logger.Error().Err(err).Msgf("failed to serialize %T body to JSON", ev)
@@ -778,7 +927,7 @@ func (g *Groupware) NotFound(w http.ResponseWriter, r *http.Request) {
if level.Enabled() {
path := log.SafeString(r.URL.Path)
method := log.SafeString(r.Method)
level.Str("path", path).Str("method", method).Int(logErrorStatus, http.StatusNotFound).Msgf("unmatched path: '%v'", path)
level.Str(logPath, path).Str(logMethod, method).Int(logErrorStatus, http.StatusNotFound).Msgf("unmatched path: '%v'", path)
}
w.WriteHeader(http.StatusNotFound)
}
@@ -788,7 +937,7 @@ func (g *Groupware) MethodNotAllowed(w http.ResponseWriter, r *http.Request) {
if level.Enabled() {
path := log.SafeString(r.URL.Path)
method := log.SafeString(r.Method)
level.Str("path", path).Str("method", method).Int(logErrorStatus, http.StatusNotFound).Msgf("method not allowed: '%v'", method)
level.Str(logPath, path).Str(logMethod, method).Int(logErrorStatus, http.StatusNotFound).Msgf("method not allowed: '%v'", method)
}
w.WriteHeader(http.StatusNotFound)
}

View File

@@ -0,0 +1,11 @@
package metrics
type HttpMetrics struct {
}
// New initializes the available metrics.
func NewHttpMetrics() *HttpMetrics {
m := &HttpMetrics{}
return m
}

View File

@@ -1,8 +1,10 @@
package metrics
import "github.com/prometheus/client_golang/prometheus"
import (
"github.com/prometheus/client_golang/prometheus"
)
var (
const (
// Namespace defines the namespace for the defines metrics.
Namespace = "opencloud"
@@ -12,23 +14,50 @@ var (
// Metrics defines the available metrics of this service.
type Metrics struct {
BuildInfo *prometheus.GaugeVec
SessionCacheDesc *prometheus.Desc
/*SSessionCache *prometheus.GaugeVec*/
EmailByIdDuration *prometheus.HistogramVec
}
const (
ResultFound = "found"
ResultNotFound = "not-found"
SessionCacheTypeInsertions = "insertions"
SessionCacheTypeHits = "hits"
SessionCacheTypeMisses = "misses"
SessionCacheTypeEvictions = "evictions"
)
var Labels = struct {
Endpoint string
Result string
SessionCacheType string
RequestId string
}{
Endpoint: "endpoint",
Result: "result",
SessionCacheType: "type",
RequestId: "requestId",
}
// New initializes the available metrics.
func New() *Metrics {
m := &Metrics{
BuildInfo: prometheus.NewGaugeVec(prometheus.GaugeOpts{
SessionCacheDesc: prometheus.NewDesc(
prometheus.BuildFQName(Namespace, Subsystem, "session_cache"),
"Session cache statistics",
[]string{Labels.SessionCacheType},
nil,
),
EmailByIdDuration: prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: Namespace,
Subsystem: Subsystem,
Name: "build_info",
Help: "Build information",
}, []string{"version"}),
//Buckets: []float64{0.1, 0.5, 1, 2.5, 5, 10, 30, 60, 120, 300, 600, 1200},
Name: "email_by_id_duration_seconds",
Help: "Duration in seconds for retrieving an Email by its id",
}, []string{Labels.Endpoint, Labels.Result}),
}
_ = prometheus.Register(
m.BuildInfo,
)
return m
}

View File

@@ -0,0 +1,28 @@
package metrics
import (
"sync/atomic"
"github.com/opencloud-eu/opencloud/pkg/version"
"github.com/prometheus/client_golang/prometheus"
)
var registered atomic.Bool
func StartupMetrics() {
// use an atomic boolean to make the operation idempotent,
// instead of causing a panic in case this function is
// called twice
if registered.CompareAndSwap(false, true) {
// https://github.com/prometheus/common/blob/8558a5b7db3c84fa38b4766966059a7bd5bfa2ee/version/info.go#L36-L56
prometheus.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Namespace: Namespace,
Subsystem: Subsystem,
Name: "build_info",
Help: "Build information",
ConstLabels: prometheus.Labels{
"version": version.GetString(),
},
}, func() float64 { return 1 }))
}
}

View File

@@ -20,7 +20,7 @@ type Options struct {
Logger log.Logger
Context context.Context
Config *config.Config
Metrics *metrics.Metrics
Metrics *metrics.HttpMetrics
Flags []cli.Flag
TraceProvider trace.TracerProvider
}
@@ -58,7 +58,7 @@ func Config(val *config.Config) Option {
}
// Metrics provides a function to set the metrics option.
func Metrics(val *metrics.Metrics) Option {
func Metrics(val *metrics.HttpMetrics) Option {
return func(o *Options) {
o.Metrics = val
}

View File

@@ -7,7 +7,7 @@ import (
)
// NewInstrument returns a service that instruments metrics.
func NewInstrument(next Service, metrics *metrics.Metrics) Service {
func NewInstrument(next Service, metrics *metrics.HttpMetrics) Service {
return instrument{
next: next,
metrics: metrics,
@@ -16,7 +16,7 @@ func NewInstrument(next Service, metrics *metrics.Metrics) Service {
type instrument struct {
next Service
metrics *metrics.Metrics
metrics *metrics.HttpMetrics
}
// ServeHTTP implements the Service interface.

View File

@@ -10,6 +10,7 @@ import (
"github.com/opencloud-eu/opencloud/pkg/log"
"github.com/opencloud-eu/opencloud/pkg/tracing"
"github.com/opencloud-eu/opencloud/services/groupware/pkg/groupware"
"github.com/opencloud-eu/opencloud/services/groupware/pkg/metrics"
)
// Service defines the service handlers.
@@ -52,5 +53,7 @@ func NewService(opts ...Option) (Service, error) {
}
}
metrics.StartupMetrics()
return gw, nil
}