groupware: session handling improvements

* remove the baseurl from the JMAP client configuration, and pass it to
   the session retrieval functions instead, as that is really the only
   place where it is relevant, and we gain flexibility to discover that
   session URL differently in the future without having to touch the
   JMAP client

 * move the default account identifier handling from the JMAP package to
   the Groupware one, as it really has nothing to do with JMAP itself,
   and is an opinionated feature of the Groupware REST API instead

 * add an event listener interface for JMAP events to be more flexible
   and universal, typically for metrics that are defined on the API
   level that uses the JMAP client

 * add errors for when default accounts cannot be determined

 * split groupware_framework.go into groupware_framework.go,
   groupware_request.go and groupware_response.go

 * move the accountId logging into the Groupware level instead of JMAP
   since it can also be relevant to other operations that might be
   worthy of logging before the JMAP client is even invoked
This commit is contained in:
Pascal Bleser
2025-09-03 16:36:30 +02:00
parent b76bec1279
commit f52a645c8a
25 changed files with 903 additions and 624 deletions
@@ -46,8 +46,8 @@ func DefaultConfig() *config.Config {
Namespace: "eu.opencloud.web",
CORS: config.CORS{
AllowedOrigins: []string{"*"},
AllowedMethods: []string{"GET", "POST", "PUT", "PATCH", "DELETE", "OPTIONS"},
AllowedHeaders: []string{"Authorization", "Origin", "Content-Type", "Accept", "X-Requested-With", "X-Request-Id", "Cache-Control"},
AllowedMethods: []string{"GET", "POST", "PUT", "PATCH", "DELETE", "OPTIONS", "REPORT"},
AllowedHeaders: []string{"Authorization", "Origin", "Content-Type", "Accept", "X-Requested-With", "X-Request-Id", "Trace-Id", "Cache-Control"},
AllowCredentials: true,
},
},
@@ -4,12 +4,13 @@ import (
"net/http"
"github.com/opencloud-eu/opencloud/pkg/jmap"
"github.com/opencloud-eu/opencloud/pkg/log"
"github.com/opencloud-eu/opencloud/pkg/structs"
)
func (g *Groupware) GetAccount(w http.ResponseWriter, r *http.Request) {
g.respond(w, r, func(req Request) Response {
account, err := req.GetAccount()
account, err := req.GetAccountForMail()
if err != nil {
return errorResponse(err)
}
@@ -56,10 +57,14 @@ type SwaggerAccountBootstrapResponse struct {
func (g *Groupware) GetAccountBootstrap(w http.ResponseWriter, r *http.Request) {
g.respond(w, r, func(req Request) Response {
mailAccountId := req.GetAccountId()
mailAccountId, err := req.GetAccountIdForMail()
if err != nil {
return errorResponse(err)
}
logger := log.From(req.logger.With().Str(logAccountId, mailAccountId))
accountIds := structs.Keys(req.session.Accounts)
resp, sessionState, jerr := g.jmap.GetIdentitiesAndMailboxes(mailAccountId, accountIds, req.session, req.ctx, req.logger)
resp, sessionState, jerr := g.jmap.GetIdentitiesAndMailboxes(mailAccountId, accountIds, req.session, req.ctx, logger)
if jerr != nil {
return req.errorResponseFromJmap(jerr)
}
@@ -8,6 +8,7 @@ import (
"github.com/go-chi/chi/v5"
"github.com/opencloud-eu/opencloud/pkg/jmap"
"github.com/opencloud-eu/opencloud/pkg/log"
)
const (
@@ -21,9 +22,15 @@ func (g *Groupware) GetBlob(w http.ResponseWriter, r *http.Request) {
return req.parameterErrorResponse(UriParamBlobId, fmt.Sprintf("Invalid value for path parameter '%v': empty", UriParamBlobId))
}
res, _, err := g.jmap.GetBlob(req.GetAccountId(), req.session, req.ctx, req.logger, blobId)
accountId, err := req.GetAccountIdForBlob()
if err != nil {
return req.errorResponseFromJmap(err)
return errorResponse(err)
}
logger := log.From(req.logger.With().Str(logAccountId, accountId))
res, _, jerr := g.jmap.GetBlob(accountId, req.session, req.ctx, logger, blobId)
if jerr != nil {
return req.errorResponseFromJmap(jerr)
}
blob := res.Blob
if blob == nil {
@@ -46,9 +53,15 @@ func (g *Groupware) UploadBlob(w http.ResponseWriter, r *http.Request) {
}(body)
}
resp, err := g.jmap.UploadBlobStream(req.GetAccountId(), req.session, req.ctx, req.logger, contentType, body)
accountId, err := req.GetAccountIdForBlob()
if err != nil {
return req.errorResponseFromJmap(err)
return errorResponse(err)
}
logger := log.From(req.logger.With().Str(logAccountId, accountId))
resp, jerr := g.jmap.UploadBlobStream(accountId, req.session, req.ctx, logger, contentType, body)
if jerr != nil {
return req.errorResponseFromJmap(jerr)
}
return etagOnlyResponse(resp, jmap.State(resp.Sha512))
@@ -60,17 +73,23 @@ func (g *Groupware) DownloadBlob(w http.ResponseWriter, r *http.Request) {
blobId := chi.URLParam(req.r, UriParamBlobId)
name := chi.URLParam(req.r, UriParamBlobName)
q := req.r.URL.Query()
tipe := q.Get(QueryParamBlobType)
if tipe == "" {
tipe = DefaultBlobDownloadType
typ := q.Get(QueryParamBlobType)
if typ == "" {
typ = DefaultBlobDownloadType
}
blob, jerr := g.jmap.DownloadBlobStream(req.GetAccountId(), blobId, name, tipe, req.session, req.ctx, req.logger)
accountId, gwerr := req.GetAccountIdForBlob()
if gwerr != nil {
return gwerr
}
logger := log.From(req.logger.With().Str(logAccountId, accountId))
blob, jerr := g.jmap.DownloadBlobStream(accountId, blobId, name, typ, req.session, req.ctx, logger)
if blob != nil && blob.Body != nil {
defer func(Body io.ReadCloser) {
err := Body.Close()
if err != nil {
req.logger.Error().Err(err).Msg("failed to close response body")
logger.Error().Err(err).Msg("failed to close response body")
}
}(blob.Body)
}
@@ -4,6 +4,7 @@ import (
"net/http"
"github.com/opencloud-eu/opencloud/pkg/jmap"
"github.com/opencloud-eu/opencloud/pkg/log"
)
// When the request suceeds.
@@ -26,9 +27,14 @@ type SwaggerGetIdentitiesResponse struct {
// 500: ErrorResponse500
func (g *Groupware) GetIdentities(w http.ResponseWriter, r *http.Request) {
g.respond(w, r, func(req Request) Response {
res, sessionState, err := g.jmap.GetIdentity(req.GetAccountId(), req.session, req.ctx, req.logger)
accountId, err := req.GetAccountIdWithoutFallback()
if err != nil {
return req.errorResponseFromJmap(err)
return errorResponse(err)
}
logger := log.From(req.logger.With().Str(logAccountId, accountId))
res, sessionState, jerr := g.jmap.GetIdentity(accountId, req.session, req.ctx, logger)
if jerr != nil {
return req.errorResponseFromJmap(jerr)
}
return etagResponse(res, sessionState, res.State)
})
@@ -6,6 +6,7 @@ import (
"github.com/go-chi/chi/v5"
"github.com/opencloud-eu/opencloud/pkg/jmap"
"github.com/opencloud-eu/opencloud/pkg/log"
)
// When the request succeeds.
@@ -33,9 +34,14 @@ type SwaggerGetMailboxById200 struct {
func (g *Groupware) GetMailbox(w http.ResponseWriter, r *http.Request) {
mailboxId := chi.URLParam(r, UriParamMailboxId)
g.respond(w, r, func(req Request) Response {
res, sessionState, err := g.jmap.GetMailbox(req.GetAccountId(), req.session, req.ctx, req.logger, []string{mailboxId})
accountId, err := req.GetAccountIdForMail()
if err != nil {
return req.errorResponseFromJmap(err)
return errorResponse(err)
}
res, sessionState, jerr := g.jmap.GetMailbox(accountId, req.session, req.ctx, req.logger, []string{mailboxId})
if jerr != nil {
return req.errorResponseFromJmap(jerr)
}
if len(res.Mailboxes) == 1 {
@@ -107,14 +113,20 @@ func (g *Groupware) GetMailboxes(w http.ResponseWriter, r *http.Request) {
hasCriteria = true
}
accountId, err := req.GetAccountIdForMail()
if err != nil {
return errorResponse(err)
}
logger := log.From(req.logger.With().Str(logAccountId, accountId))
if hasCriteria {
mailboxes, sessionState, err := g.jmap.SearchMailboxes(req.GetAccountId(), req.session, req.ctx, req.logger, filter)
mailboxes, sessionState, err := g.jmap.SearchMailboxes(accountId, req.session, req.ctx, logger, filter)
if err != nil {
return req.errorResponseFromJmap(err)
}
return etagResponse(mailboxes.Mailboxes, sessionState, mailboxes.State)
} else {
mailboxes, sessionState, err := g.jmap.GetAllMailboxes(req.GetAccountId(), req.session, req.ctx, req.logger)
mailboxes, sessionState, err := g.jmap.GetAllMailboxes(accountId, req.session, req.ctx, logger)
if err != nil {
return req.errorResponseFromJmap(err)
}
@@ -64,9 +64,15 @@ func (g *Groupware) GetAllMessagesInMailbox(w http.ResponseWriter, r *http.Reque
if mailboxId == "" {
return req.parameterErrorResponse(UriParamMailboxId, fmt.Sprintf("Missing required mailbox ID path parameter '%v'", UriParamMailboxId))
}
logger := log.From(req.logger.With().Str(HeaderSince, since))
emails, sessionState, jerr := g.jmap.GetEmailsInMailboxSince(req.GetAccountId(), req.session, req.ctx, logger, mailboxId, since, true, g.maxBodyValueBytes, maxChanges)
accountId, err := req.GetAccountIdForMail()
if err != nil {
return errorResponse(err)
}
logger := log.From(req.logger.With().Str(HeaderSince, since).Str(logAccountId, accountId))
emails, sessionState, jerr := g.jmap.GetEmailsInMailboxSince(accountId, req.session, req.ctx, logger, mailboxId, since, true, g.maxBodyValueBytes, maxChanges)
if jerr != nil {
return req.errorResponseFromJmap(jerr)
}
@@ -95,9 +101,15 @@ func (g *Groupware) GetAllMessagesInMailbox(w http.ResponseWriter, r *http.Reque
l = l.Uint(QueryParamLimit, limit)
}
accountId, err := req.GetAccountIdForMail()
if err != nil {
return errorResponse(err)
}
l = l.Str(logAccountId, accountId)
logger := log.From(l)
emails, sessionState, jerr := g.jmap.GetAllEmails(req.GetAccountId(), req.session, req.ctx, logger, mailboxId, offset, limit, true, g.maxBodyValueBytes)
emails, sessionState, jerr := g.jmap.GetAllEmails(accountId, req.session, req.ctx, logger, mailboxId, offset, limit, true, g.maxBodyValueBytes)
if jerr != nil {
return req.errorResponseFromJmap(jerr)
}
@@ -115,8 +127,14 @@ func (g *Groupware) GetMessagesById(w http.ResponseWriter, r *http.Request) {
return req.parameterErrorResponse(UriParamMessageId, fmt.Sprintf("Invalid value for path parameter '%v': '%s': %s", UriParamMessageId, log.SafeString(id), "empty list of mail ids"))
}
logger := log.From(req.logger.With().Str("id", log.SafeString(id)))
emails, sessionState, jerr := g.jmap.GetEmails(req.GetAccountId(), req.session, req.ctx, logger, ids, true, g.maxBodyValueBytes)
accountId, err := req.GetAccountIdForMail()
if err != nil {
return errorResponse(err)
}
logger := log.From(req.logger.With().Str("id", log.SafeString(id)).Str(logAccountId, log.SafeString(accountId)))
emails, sessionState, jerr := g.jmap.GetEmails(accountId, req.session, req.ctx, logger, ids, true, g.maxBodyValueBytes)
if jerr != nil {
return req.errorResponseFromJmap(jerr)
}
@@ -135,9 +153,16 @@ func (g *Groupware) getMessagesSince(w http.ResponseWriter, r *http.Request, sin
if ok {
l = l.Uint(QueryParamMaxChanges, maxChanges)
}
accountId, err := req.GetAccountIdForMail()
if err != nil {
return errorResponse(err)
}
l = l.Str(logAccountId, log.SafeString(accountId))
logger := log.From(l)
emails, sessionState, jerr := g.jmap.GetEmailsSince(req.GetAccountId(), req.session, req.ctx, logger, since, true, g.maxBodyValueBytes, maxChanges)
emails, sessionState, jerr := g.jmap.GetEmailsSince(accountId, req.session, req.ctx, logger, since, true, g.maxBodyValueBytes, maxChanges)
if jerr != nil {
return req.errorResponseFromJmap(jerr)
}
@@ -328,7 +353,13 @@ func (g *Groupware) searchMessages(w http.ResponseWriter, r *http.Request) {
logger = log.From(logger.With().Bool(QueryParamSearchFetchBodies, fetchBodies))
}
results, sessionState, jerr := g.jmap.QueryEmailsWithSnippets(req.GetAccountId(), filter, req.session, req.ctx, logger, offset, limit, fetchBodies, g.maxBodyValueBytes)
accountId, err := req.GetAccountIdForMail()
if err != nil {
return errorResponse(err)
}
logger = log.From(logger.With().Str(logAccountId, accountId))
results, sessionState, jerr := g.jmap.QueryEmailsWithSnippets(accountId, filter, req.session, req.ctx, logger, offset, limit, fetchBodies, g.maxBodyValueBytes)
if jerr != nil {
return req.errorResponseFromJmap(jerr)
}
@@ -355,7 +386,13 @@ func (g *Groupware) searchMessages(w http.ResponseWriter, r *http.Request) {
QueryState: results.QueryState,
}, sessionState, results.QueryState)
} else {
results, sessionState, jerr := g.jmap.QueryEmailSnippets(req.GetAccountId(), filter, req.session, req.ctx, logger, offset, limit)
accountId, err := req.GetAccountIdForMail()
if err != nil {
return errorResponse(err)
}
logger = log.From(logger.With().Str(logAccountId, accountId))
results, sessionState, jerr := g.jmap.QueryEmailSnippets(accountId, filter, req.session, req.ctx, logger, offset, limit)
if jerr != nil {
return req.errorResponseFromJmap(jerr)
}
@@ -400,6 +437,12 @@ func (g *Groupware) CreateMessage(w http.ResponseWriter, r *http.Request) {
g.respond(w, r, func(req Request) Response {
logger := req.logger
accountId, gwerr := req.GetAccountIdForMail()
if gwerr != nil {
return errorResponse(gwerr)
}
logger = log.From(logger.With().Str(logAccountId, accountId))
var body MessageCreation
err := req.body(&body)
if err != nil {
@@ -427,7 +470,7 @@ func (g *Groupware) CreateMessage(w http.ResponseWriter, r *http.Request) {
BodyValues: body.BodyValues,
}
created, sessionState, jerr := g.jmap.CreateEmail(req.GetAccountId(), create, req.session, req.ctx, logger)
created, sessionState, jerr := g.jmap.CreateEmail(accountId, create, req.session, req.ctx, logger)
if jerr != nil {
return req.errorResponseFromJmap(jerr)
}
@@ -443,6 +486,12 @@ func (g *Groupware) UpdateMessage(w http.ResponseWriter, r *http.Request) {
l := req.logger.With()
l.Str(UriParamMessageId, messageId)
accountId, gwerr := req.GetAccountIdForMail()
if gwerr != nil {
return errorResponse(gwerr)
}
l.Str(logAccountId, accountId)
logger := log.From(l)
var body map[string]any
@@ -455,7 +504,7 @@ func (g *Groupware) UpdateMessage(w http.ResponseWriter, r *http.Request) {
messageId: body,
}
result, sessionState, jerr := g.jmap.UpdateEmails(req.GetAccountId(), updates, req.session, req.ctx, logger)
result, sessionState, jerr := g.jmap.UpdateEmails(accountId, updates, req.session, req.ctx, logger)
if jerr != nil {
return req.errorResponseFromJmap(jerr)
}
@@ -481,9 +530,16 @@ func (g *Groupware) DeleteMessage(w http.ResponseWriter, r *http.Request) {
l := req.logger.With()
l.Str(UriParamMessageId, messageId)
accountId, gwerr := req.GetAccountIdForMail()
if gwerr != nil {
return errorResponse(gwerr)
}
l.Str(logAccountId, accountId)
logger := log.From(l)
_, sessionState, jerr := g.jmap.DeleteEmails(req.GetAccountId(), []string{messageId}, req.session, req.ctx, logger)
_, sessionState, jerr := g.jmap.DeleteEmails(accountId, []string{messageId}, req.session, req.ctx, logger)
if jerr != nil {
return req.errorResponseFromJmap(jerr)
}
@@ -546,19 +602,33 @@ func (g *Groupware) RelatedToMessage(w http.ResponseWriter, r *http.Request) {
id := chi.URLParam(r, UriParamMessageId)
g.respond(w, r, func(req Request) Response {
limit, _, err := req.parseUIntParam(QueryParamLimit, 10) // TODO configurable default limit
l := req.logger.With().Str(logEmailId, log.SafeString(id))
limit, ok, err := req.parseUIntParam(QueryParamLimit, 10) // TODO configurable default limit
if err != nil {
return errorResponse(err)
}
if ok {
l = l.Uint("limit", limit)
}
days, _, err := req.parseUIntParam(QueryParamDays, 5) // TODO configurable default days
days, ok, err := req.parseUIntParam(QueryParamDays, 5) // TODO configurable default days
if err != nil {
return errorResponse(err)
}
if ok {
l = l.Uint("days", days)
}
accountId, gwerr := req.GetAccountIdForMail()
if gwerr != nil {
return errorResponse(gwerr)
}
l = l.Str(logAccountId, accountId)
logger := log.From(l)
reqId := req.GetRequestId()
accountId := req.GetAccountId()
logger := log.From(req.logger.With().Str(logEmailId, log.SafeString(id)))
getEmailsBefore := time.Now()
emails, sessionState, jerr := g.jmap.GetEmails(accountId, req.session, req.ctx, logger, []string{id}, true, g.maxBodyValueBytes)
getEmailsDuration := time.Since(getEmailsBefore)
@@ -4,6 +4,7 @@ import (
"net/http"
"github.com/opencloud-eu/opencloud/pkg/jmap"
"github.com/opencloud-eu/opencloud/pkg/log"
)
// When the request succeeds.
@@ -30,9 +31,15 @@ type SwaggerGetVacationResponse200 struct {
// 500: ErrorResponse500
func (g *Groupware) GetVacation(w http.ResponseWriter, r *http.Request) {
g.respond(w, r, func(req Request) Response {
res, sessionState, err := g.jmap.GetVacationResponse(req.GetAccountId(), req.session, req.ctx, req.logger)
accountId, err := req.GetAccountIdForVacationResponse()
if err != nil {
return req.errorResponseFromJmap(err)
return errorResponse(err)
}
logger := log.From(req.logger.With().Str(logAccountId, accountId))
res, sessionState, jerr := g.jmap.GetVacationResponse(accountId, req.session, req.ctx, logger)
if jerr != nil {
return req.errorResponseFromJmap(jerr)
}
return etagResponse(res, sessionState, res.State)
})
@@ -66,7 +73,13 @@ func (g *Groupware) SetVacation(w http.ResponseWriter, r *http.Request) {
return errorResponse(err)
}
res, sessionState, jerr := g.jmap.SetVacationResponse(req.GetAccountId(), body, req.session, req.ctx, req.logger)
accountId, err := req.GetAccountIdForVacationResponse()
if err != nil {
return errorResponse(err)
}
logger := log.From(req.logger.With().Str(logAccountId, accountId))
res, sessionState, jerr := g.jmap.SetVacationResponse(accountId, body, req.session, req.ctx, logger)
if jerr != nil {
return req.errorResponseFromJmap(jerr)
}
@@ -162,6 +162,7 @@ const (
ErrorCodeInvalidRequestParameter = "INVPAR"
ErrorCodeInvalidRequestBody = "INVBDY"
ErrorCodeNonExistingAccount = "INVACC"
ErrorCodeIndeterminateAccount = "INDACC"
ErrorCodeApiInconsistency = "APIINC"
ErrorCodeInvalidUserRequest = "INVURQ"
)
@@ -275,12 +276,18 @@ var (
Title: "Invalid Request",
Detail: "The request is invalid.",
}
ErrorNonExistingAccount = GroupwareError{
ErrorIndeterminateAccount = GroupwareError{
Status: http.StatusBadRequest,
Code: ErrorCodeNonExistingAccount,
Title: "Invalid Account Parameter",
Detail: "The account the request is for does not exist.",
}
ErrorNonExistingAccount = GroupwareError{
Status: http.StatusBadRequest,
Code: ErrorCodeIndeterminateAccount,
Title: "Failed to determine Account",
Detail: "The account the request is for could not be determined.",
}
ErrorApiInconsistency = GroupwareError{
Status: http.StatusInternalServerError,
Code: ErrorCodeApiInconsistency,
@@ -5,15 +5,12 @@ import (
"crypto/tls"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"strconv"
"sync/atomic"
"time"
"github.com/go-chi/chi/v5"
chimiddleware "github.com/go-chi/chi/v5/middleware"
"github.com/go-chi/render"
"github.com/r3labs/sse/v2"
"github.com/rs/zerolog"
@@ -29,12 +26,12 @@ import (
"github.com/opencloud-eu/opencloud/services/groupware/pkg/config"
"github.com/opencloud-eu/opencloud/services/groupware/pkg/metrics"
groupwaremiddleware "github.com/opencloud-eu/opencloud/services/groupware/pkg/middleware"
)
const (
logUsername = "username" // this should match jmap.logUsername to avoid having the field twice in the logs under different keys
logUserId = "user-id"
logAccountId = "account-id"
logErrorId = "error-id"
logErrorCode = "code"
logErrorStatus = "status"
@@ -129,6 +126,28 @@ type Event struct {
Body any
}
type groupwareHttpJmapApiClientMetricsRecorder struct {
m *metrics.Metrics
}
func (r groupwareHttpJmapApiClientMetricsRecorder) OnSuccessfulRequest(endpoint string, status int) {
r.m.SuccessfulRequestPerEndpointCounter.With(metrics.Endpoint(endpoint)).Inc()
}
func (r groupwareHttpJmapApiClientMetricsRecorder) OnFailedRequest(endpoint string, err error) {
r.m.FailedRequestPerEndpointCounter.With(metrics.Endpoint(endpoint)).Inc()
}
func (r groupwareHttpJmapApiClientMetricsRecorder) OnFailedRequestWithStatus(endpoint string, status int) {
r.m.FailedRequestStatusPerEndpointCounter.With(metrics.EndpointAndStatus(endpoint, status)).Inc()
}
func (r groupwareHttpJmapApiClientMetricsRecorder) OnResponseBodyReadingError(endpoint string, err error) {
r.m.ResponseBodyReadingErrorPerEndpointCounter.With(metrics.Endpoint(endpoint)).Inc()
}
func (r groupwareHttpJmapApiClientMetricsRecorder) OnResponseBodyUnmarshallingError(endpoint string, err error) {
r.m.ResponseBodyUnmarshallingErrorPerEndpointCounter.With(metrics.Endpoint(endpoint)).Inc()
}
var _ jmap.HttpJmapApiClientEventListener = groupwareHttpJmapApiClientMetricsRecorder{}
func NewGroupware(config *config.Config, logger *log.Logger, mux *chi.Mux, prometheusRegistry prometheus.Registerer) (*Groupware, error) {
baseUrl, err := url.Parse(config.Mail.BaseUrl)
if err != nil {
@@ -136,6 +155,8 @@ func NewGroupware(config *config.Config, logger *log.Logger, mux *chi.Mux, prome
return nil, GroupwareInitializationError{Message: fmt.Sprintf("failed to parse configured Mail.BaseUrl '%s'", config.Mail.BaseUrl), Err: err}
}
sessionUrl := baseUrl.JoinPath(".well-known", "jmap")
masterUsername := config.Mail.Master.Username
if masterUsername == "" {
logger.Error().Msg("failed to parse empty Mail.Master.Username")
@@ -163,7 +184,7 @@ func NewGroupware(config *config.Config, logger *log.Logger, mux *chi.Mux, prome
insecureTls := true // TODO make configurable
m := metrics.New(logger)
m := metrics.New(prometheusRegistry, logger)
// TODO add timeouts and other meaningful configuration settings for the HTTP client
tr := http.DefaultTransport.(*http.Transport).Clone()
@@ -177,16 +198,13 @@ func NewGroupware(config *config.Config, logger *log.Logger, mux *chi.Mux, prome
userProvider := NewRevaContextUsernameProvider()
api := jmap.NewHttpJmapApiClient(
*baseUrl,
jmapMetricsAdapter := groupwareHttpJmapApiClientMetricsRecorder{m: m}
api := jmap.NewHttpJmapClient(
&c,
masterUsername,
masterPassword,
jmap.HttpJmapApiClientMetrics{
SuccessfulRequestPerEndpointCounter: m.SuccessfulRequestPerEndpointCounter,
FailedRequestPerEndpointCounter: m.FailedRequestPerEndpointCounter,
FailedRequestStatusPerEndpointCounter: m.FailedRequestStatusPerEndpointCounter,
},
jmapMetricsAdapter,
)
jmapClient := jmap.NewClient(api, api, api)
@@ -197,6 +215,10 @@ func NewGroupware(config *config.Config, logger *log.Logger, mux *chi.Mux, prome
logger: logger,
jmapClient: &jmapClient,
errorTtl: sessionFailureCacheTtl,
sessionUrlProvider: func(username string) (*url.URL, *GroupwareError) {
// here is where we would implement server sharding
return sessionUrl, nil
},
}
sessionCache = ttlcache.New(
@@ -238,36 +260,20 @@ func NewGroupware(config *config.Config, logger *log.Logger, mux *chi.Mux, prome
sessionEventListener := sessionEventListener{
sessionCache: sessionCache,
logger: logger,
counter: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: metrics.Namespace,
Subsystem: metrics.Subsystem,
Name: "outdated_sessions_count",
Help: "Counts outdated session events",
}),
counter: m.OutdatedSessionsCounter,
}
jmapClient.AddSessionEventListener(&sessionEventListener)
// A channel to process SSE Events with a single worker.
eventChannel := make(chan Event, eventChannelSize)
{
totalWorkerBufferMetric, err := prometheus.NewConstMetric(prometheus.NewDesc(
prometheus.BuildFQName(metrics.Namespace, metrics.Subsystem, "event_buffer_size"),
"Size of the buffer channel for server-sent events to process",
nil,
nil,
), prometheus.GaugeValue, float64(eventChannelSize))
eventBufferSizeMetric, err := prometheus.NewConstMetric(m.EventBufferSizeDesc, prometheus.GaugeValue, float64(eventChannelSize))
if err != nil {
logger.Warn().Err(err).Msg("failed to create event_buffer_size metric")
logger.Warn().Err(err).Msgf("failed to create metric %v", m.EventBufferSizeDesc.String())
} else {
prometheusRegistry.Register(metrics.ConstMetricCollector{Metric: totalWorkerBufferMetric})
prometheusRegistry.Register(metrics.ConstMetricCollector{Metric: eventBufferSizeMetric})
}
prometheusRegistry.Register(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Namespace: metrics.Namespace,
Subsystem: metrics.Subsystem,
Name: "event_buffer_queued",
Help: "Number of queued server-sent events",
}, func() float64 {
prometheusRegistry.Register(prometheus.NewGaugeFunc(m.EventBufferQueuedOpts, func() float64 {
return float64(len(eventChannel))
}))
}
@@ -282,60 +288,35 @@ func NewGroupware(config *config.Config, logger *log.Logger, mux *chi.Mux, prome
sseServer.OnUnsubscribe = func(streamID string, sub *sse.Subscriber) {
sseSubscribers.Add(-1)
}
prometheusRegistry.Register(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Namespace: metrics.Namespace,
Subsystem: metrics.Subsystem,
Name: "sse_subscribers",
Help: "Number of subscribers for server-sent event streams",
}, func() float64 {
prometheusRegistry.Register(prometheus.NewGaugeFunc(m.SSESubscribersOpts, func() float64 {
return float64(sseSubscribers.Load())
}))
}
jobsChannel := make(chan Job, workerQueueSize)
{
totalWorkerBufferMetric, err := prometheus.NewConstMetric(prometheus.NewDesc(
prometheus.BuildFQName(metrics.Namespace, metrics.Subsystem, "workers_buffer_size"),
"Size of the buffer channel for background worker jobs",
nil,
nil,
), prometheus.GaugeValue, float64(workerQueueSize))
totalWorkerBufferMetric, err := prometheus.NewConstMetric(m.WorkersBufferSizeDesc, prometheus.GaugeValue, float64(workerQueueSize))
if err != nil {
logger.Warn().Err(err).Msg("failed to create workers_buffer_size metric")
logger.Warn().Err(err).Msgf("failed to create metric %v", m.WorkersBufferSizeDesc.String())
} else {
prometheusRegistry.Register(metrics.ConstMetricCollector{Metric: totalWorkerBufferMetric})
}
prometheusRegistry.Register(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Namespace: metrics.Namespace,
Subsystem: metrics.Subsystem,
Name: "workers_buffer_queued",
Help: "Number of queued background jobs",
}, func() float64 {
prometheusRegistry.Register(prometheus.NewGaugeFunc(m.WorkersBufferQueuedOpts, func() float64 {
return float64(len(jobsChannel))
}))
}
var busyWorkers atomic.Int32
{
totalWorkersMetric, err := prometheus.NewConstMetric(prometheus.NewDesc(
prometheus.BuildFQName(metrics.Namespace, metrics.Subsystem, "workers_total"),
"Total amount of background job workers",
nil,
nil,
), prometheus.GaugeValue, float64(workerPoolSize))
totalWorkersMetric, err := prometheus.NewConstMetric(m.TotalWorkersDesc, prometheus.GaugeValue, float64(workerPoolSize))
if err != nil {
logger.Warn().Err(err).Msg("failed to create workers_total metric")
logger.Warn().Err(err).Msgf("failed to create metric %v", m.TotalWorkersDesc.String())
} else {
prometheusRegistry.Register(metrics.ConstMetricCollector{Metric: totalWorkersMetric})
}
prometheusRegistry.Register(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Namespace: metrics.Namespace,
Subsystem: metrics.Subsystem,
Name: "workers_busy",
Help: "Number of background job workers that are currently busy executing jobs",
}, func() float64 {
prometheusRegistry.Register(prometheus.NewGaugeFunc(m.BusyWorkersOpts, func() float64 {
return float64(busyWorkers.Load())
}))
}
@@ -382,7 +363,7 @@ func (g *Groupware) worker(jobs <-chan Job, busy *atomic.Int32) {
logger := log.From(job.logger.With().Str(logJobDescription, job.description).Uint64(logJobId, job.id))
job.job(job.id, logger)
if logger.Trace().Enabled() {
logger.Trace().Msgf("finished job %d [%s] in %v", job.id, job.description, time.Since(before)) // TODO remove
logger.Trace().Msgf("finished job %d [%s] in %v", job.id, job.description, time.Since(before))
}
busy.Add(-1)
}
@@ -469,274 +450,6 @@ func (g *Groupware) session(user User, _ *http.Request, _ context.Context, _ *lo
return jmap.Session{}, false, nil
}
// using a wrapper class for requests, to group multiple parameters, really to avoid crowding the
// API of handlers but also to make it easier to expand it in the future without having to modify
// the parameter list of every single handler function
type Request struct {
g *Groupware
user User
r *http.Request
ctx context.Context
logger *log.Logger
session *jmap.Session
}
type Response struct {
body any
status int
err *Error
etag jmap.State
sessionState jmap.SessionState
}
func errorResponse(err *Error) Response {
return Response{
body: nil,
err: err,
etag: "",
sessionState: "",
}
}
func response(body any, sessionState jmap.SessionState) Response {
return Response{
body: body,
err: nil,
etag: jmap.State(sessionState),
sessionState: sessionState,
}
}
func etagResponse(body any, sessionState jmap.SessionState, etag jmap.State) Response {
return Response{
body: body,
err: nil,
etag: etag,
sessionState: sessionState,
}
}
func etagOnlyResponse(body any, etag jmap.State) Response {
return Response{
body: body,
err: nil,
etag: etag,
sessionState: "",
}
}
func noContentResponse(sessionState jmap.SessionState) Response {
return Response{
body: nil,
status: http.StatusNoContent,
err: nil,
etag: jmap.State(sessionState),
sessionState: sessionState,
}
}
func acceptedResponse(sessionState jmap.SessionState) Response {
return Response{
body: nil,
status: http.StatusAccepted,
err: nil,
etag: jmap.State(sessionState),
sessionState: sessionState,
}
}
func timeoutResponse(sessionState jmap.SessionState) Response {
return Response{
body: nil,
status: http.StatusRequestTimeout,
err: nil,
etag: "",
sessionState: sessionState,
}
}
func notFoundResponse(sessionState jmap.SessionState) Response {
return Response{
body: nil,
status: http.StatusNotFound,
err: nil,
etag: jmap.State(sessionState),
sessionState: sessionState,
}
}
func (r Request) push(typ string, event any) {
r.g.push(r.user, typ, event)
}
func (r Request) GetUser() User {
return r.user
}
func (r Request) GetRequestId() string {
return chimiddleware.GetReqID(r.ctx)
}
func (r Request) GetTraceId() string {
return groupwaremiddleware.GetTraceID(r.ctx)
}
func (r Request) GetAccountId() string {
accountId := chi.URLParam(r.r, UriParamAccount)
return r.session.MailAccountId(accountId)
}
func (r Request) GetAccount() (jmap.SessionAccount, *Error) {
accountId := r.GetAccountId()
account, ok := r.session.Accounts[accountId]
if !ok {
r.logger.Debug().Msgf("failed to find account '%v'", accountId)
// TODO metric for inexistent accounts
return jmap.SessionAccount{}, apiError(r.errorId(), ErrorNonExistingAccount,
withDetail(fmt.Sprintf("The account '%v' does not exist", log.SafeString(accountId))),
withSource(&ErrorSource{Parameter: UriParamAccount}),
)
}
return account, nil
}
func (r Request) parameterError(param string, detail string) *Error {
return r.observedParameterError(ErrorInvalidRequestParameter,
withDetail(detail),
withSource(&ErrorSource{Parameter: param}))
}
func (r Request) parameterErrorResponse(param string, detail string) Response {
return errorResponse(r.parameterError(param, detail))
}
func (r Request) parseIntParam(param string, defaultValue int) (int, bool, *Error) {
q := r.r.URL.Query()
if !q.Has(param) {
return defaultValue, false, nil
}
str := q.Get(param)
if str == "" {
return defaultValue, false, nil
}
value, err := strconv.ParseInt(str, 10, 0)
if err != nil {
// don't include the original error, as it leaks too much about our implementation, e.g.:
// strconv.ParseInt: parsing \"a\": invalid syntax
msg := fmt.Sprintf("Invalid numeric value for query parameter '%v': '%s'", param, log.SafeString(str))
return defaultValue, true, r.observedParameterError(ErrorInvalidRequestParameter,
withDetail(msg),
withSource(&ErrorSource{Parameter: param}),
)
}
return int(value), true, nil
}
func (r Request) parseUIntParam(param string, defaultValue uint) (uint, bool, *Error) {
q := r.r.URL.Query()
if !q.Has(param) {
return defaultValue, false, nil
}
str := q.Get(param)
if str == "" {
return defaultValue, false, nil
}
value, err := strconv.ParseUint(str, 10, 0)
if err != nil {
// don't include the original error, as it leaks too much about our implementation, e.g.:
// strconv.ParseInt: parsing \"a\": invalid syntax
msg := fmt.Sprintf("Invalid numeric value for query parameter '%v': '%s'", param, log.SafeString(str))
return defaultValue, true, r.observedParameterError(ErrorInvalidRequestParameter,
withDetail(msg),
withSource(&ErrorSource{Parameter: param}),
)
}
return uint(value), true, nil
}
func (r Request) parseDateParam(param string) (time.Time, bool, *Error) {
q := r.r.URL.Query()
if !q.Has(param) {
return time.Time{}, false, nil
}
str := q.Get(param)
if str == "" {
return time.Time{}, false, nil
}
t, err := time.Parse(time.RFC3339, str)
if err != nil {
msg := fmt.Sprintf("Invalid RFC3339 value for query parameter '%v': '%s': %s", param, log.SafeString(str), err.Error())
return time.Time{}, true, r.observedParameterError(ErrorInvalidRequestParameter,
withDetail(msg),
withSource(&ErrorSource{Parameter: param}),
)
}
return t, true, nil
}
func (r Request) parseBoolParam(param string, defaultValue bool) (bool, bool, *Error) {
q := r.r.URL.Query()
if !q.Has(param) {
return defaultValue, false, nil
}
str := q.Get(param)
if str == "" {
return defaultValue, false, nil
}
b, err := strconv.ParseBool(str)
if err != nil {
msg := fmt.Sprintf("Invalid boolean value for query parameter '%v': '%s': %s", param, log.SafeString(str), err.Error())
return defaultValue, true, r.observedParameterError(ErrorInvalidRequestParameter,
withDetail(msg),
withSource(&ErrorSource{Parameter: param}),
)
}
return b, true, nil
}
func (r Request) body(target any) *Error {
body := r.r.Body
defer func(b io.ReadCloser) {
err := b.Close()
if err != nil {
r.logger.Error().Err(err).Msg("failed to close request body")
}
}(body)
err := json.NewDecoder(body).Decode(target)
if err != nil {
return r.observedParameterError(ErrorInvalidRequestBody, withSource(&ErrorSource{Pointer: "/"})) // we don't get any details here
}
return nil
}
func (r Request) observe(obs prometheus.Observer, value float64) {
metrics.WithExemplar(obs, value, r.GetRequestId(), r.GetTraceId())
}
func (r Request) observeParameterError(err *Error) *Error {
if err != nil {
r.g.metrics.ParameterErrorCounter.WithLabelValues(err.Code).Inc()
}
return err
}
func (r Request) observeJmapError(jerr jmap.Error) jmap.Error {
if jerr != nil {
r.g.metrics.JmapErrorCounter.WithLabelValues(r.session.JmapEndpoint, strconv.Itoa(jerr.Code())).Inc()
}
return jerr
}
func (g *Groupware) log(error *Error) {
var level *zerolog.Event
if error.NumStatus < 300 {
@@ -0,0 +1,259 @@
package groupware
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"strconv"
"time"
"github.com/go-chi/chi/v5"
chimiddleware "github.com/go-chi/chi/v5/middleware"
"github.com/prometheus/client_golang/prometheus"
"github.com/opencloud-eu/opencloud/pkg/jmap"
"github.com/opencloud-eu/opencloud/pkg/log"
"github.com/opencloud-eu/opencloud/services/groupware/pkg/metrics"
groupwaremiddleware "github.com/opencloud-eu/opencloud/services/groupware/pkg/middleware"
)
// using a wrapper class for requests, to group multiple parameters, really to avoid crowding the
// API of handlers but also to make it easier to expand it in the future without having to modify
// the parameter list of every single handler function
type Request struct {
g *Groupware
user User
r *http.Request
ctx context.Context
logger *log.Logger
session *jmap.Session
}
func (r Request) push(typ string, event any) {
r.g.push(r.user, typ, event)
}
func (r Request) GetUser() User {
return r.user
}
func (r Request) GetRequestId() string {
return chimiddleware.GetReqID(r.ctx)
}
func (r Request) GetTraceId() string {
return groupwaremiddleware.GetTraceID(r.ctx)
}
var (
errNoPrimaryAccountFallback = errors.New("no primary account fallback")
errNoPrimaryAccountForMail = errors.New("no primary account for mail")
errNoPrimaryAccountForBlob = errors.New("no primary account for blob")
errNoPrimaryAccountForVacationResponse = errors.New("no primary account for vacation response")
errNoPrimaryAccountForSubmission = errors.New("no primary account for submission")
// errNoPrimaryAccountForSieve = errors.New("no primary account for sieve")
// errNoPrimaryAccountForQuota = errors.New("no primary account for quota")
// errNoPrimaryAccountForWebsocket = errors.New("no primary account for websocket")
)
func (r Request) GetAccountIdWithoutFallback() (string, *Error) {
accountId := chi.URLParam(r.r, UriParamAccountId)
if accountId == "" || accountId == defaultAccountId {
r.logger.Error().Err(errNoPrimaryAccountFallback).Msg("failed to determine the accountId")
return "", apiError(r.errorId(), ErrorNonExistingAccount,
withDetail("Failed to determine the account to use"),
withSource(&ErrorSource{Parameter: UriParamAccountId}),
)
}
return accountId, nil
}
func (r Request) getAccountId(fallback string, err error) (string, *Error) {
accountId := chi.URLParam(r.r, UriParamAccountId)
if accountId == "" || accountId == defaultAccountId {
accountId = fallback
}
if accountId == "" {
r.logger.Error().Err(err).Msg("failed to determine the accountId")
return "", apiError(r.errorId(), ErrorNonExistingAccount,
withDetail("Failed to determine the account to use"),
withSource(&ErrorSource{Parameter: UriParamAccountId}),
)
}
return accountId, nil
}
func (r Request) GetAccountIdForMail() (string, *Error) {
return r.getAccountId(r.session.PrimaryAccounts.Mail, errNoPrimaryAccountForMail)
}
func (r Request) GetAccountIdForBlob() (string, *Error) {
return r.getAccountId(r.session.PrimaryAccounts.Blob, errNoPrimaryAccountForBlob)
}
func (r Request) GetAccountIdForVacationResponse() (string, *Error) {
return r.getAccountId(r.session.PrimaryAccounts.VacationResponse, errNoPrimaryAccountForVacationResponse)
}
func (r Request) GetAccountIdForSubmission() (string, *Error) {
return r.getAccountId(r.session.PrimaryAccounts.Blob, errNoPrimaryAccountForSubmission)
}
func (r Request) GetAccountForMail() (jmap.SessionAccount, *Error) {
accountId, err := r.GetAccountIdForMail()
if err != nil {
return jmap.SessionAccount{}, err
}
account, ok := r.session.Accounts[accountId]
if !ok {
r.logger.Debug().Msgf("failed to find account '%v'", accountId)
// TODO metric for inexistent accounts
return jmap.SessionAccount{}, apiError(r.errorId(), ErrorNonExistingAccount,
withDetail(fmt.Sprintf("The account '%v' does not exist", log.SafeString(accountId))),
withSource(&ErrorSource{Parameter: UriParamAccountId}),
)
}
return account, nil
}
func (r Request) parameterError(param string, detail string) *Error {
return r.observedParameterError(ErrorInvalidRequestParameter,
withDetail(detail),
withSource(&ErrorSource{Parameter: param}))
}
func (r Request) parameterErrorResponse(param string, detail string) Response {
return errorResponse(r.parameterError(param, detail))
}
func (r Request) parseIntParam(param string, defaultValue int) (int, bool, *Error) {
q := r.r.URL.Query()
if !q.Has(param) {
return defaultValue, false, nil
}
str := q.Get(param)
if str == "" {
return defaultValue, false, nil
}
value, err := strconv.ParseInt(str, 10, 0)
if err != nil {
// don't include the original error, as it leaks too much about our implementation, e.g.:
// strconv.ParseInt: parsing \"a\": invalid syntax
msg := fmt.Sprintf("Invalid numeric value for query parameter '%v': '%s'", param, log.SafeString(str))
return defaultValue, true, r.observedParameterError(ErrorInvalidRequestParameter,
withDetail(msg),
withSource(&ErrorSource{Parameter: param}),
)
}
return int(value), true, nil
}
func (r Request) parseUIntParam(param string, defaultValue uint) (uint, bool, *Error) {
q := r.r.URL.Query()
if !q.Has(param) {
return defaultValue, false, nil
}
str := q.Get(param)
if str == "" {
return defaultValue, false, nil
}
value, err := strconv.ParseUint(str, 10, 0)
if err != nil {
// don't include the original error, as it leaks too much about our implementation, e.g.:
// strconv.ParseInt: parsing \"a\": invalid syntax
msg := fmt.Sprintf("Invalid numeric value for query parameter '%v': '%s'", param, log.SafeString(str))
return defaultValue, true, r.observedParameterError(ErrorInvalidRequestParameter,
withDetail(msg),
withSource(&ErrorSource{Parameter: param}),
)
}
return uint(value), true, nil
}
func (r Request) parseDateParam(param string) (time.Time, bool, *Error) {
q := r.r.URL.Query()
if !q.Has(param) {
return time.Time{}, false, nil
}
str := q.Get(param)
if str == "" {
return time.Time{}, false, nil
}
t, err := time.Parse(time.RFC3339, str)
if err != nil {
msg := fmt.Sprintf("Invalid RFC3339 value for query parameter '%v': '%s': %s", param, log.SafeString(str), err.Error())
return time.Time{}, true, r.observedParameterError(ErrorInvalidRequestParameter,
withDetail(msg),
withSource(&ErrorSource{Parameter: param}),
)
}
return t, true, nil
}
func (r Request) parseBoolParam(param string, defaultValue bool) (bool, bool, *Error) {
q := r.r.URL.Query()
if !q.Has(param) {
return defaultValue, false, nil
}
str := q.Get(param)
if str == "" {
return defaultValue, false, nil
}
b, err := strconv.ParseBool(str)
if err != nil {
msg := fmt.Sprintf("Invalid boolean value for query parameter '%v': '%s': %s", param, log.SafeString(str), err.Error())
return defaultValue, true, r.observedParameterError(ErrorInvalidRequestParameter,
withDetail(msg),
withSource(&ErrorSource{Parameter: param}),
)
}
return b, true, nil
}
func (r Request) body(target any) *Error {
body := r.r.Body
defer func(b io.ReadCloser) {
err := b.Close()
if err != nil {
r.logger.Error().Err(err).Msg("failed to close request body")
}
}(body)
err := json.NewDecoder(body).Decode(target)
if err != nil {
return r.observedParameterError(ErrorInvalidRequestBody, withSource(&ErrorSource{Pointer: "/"})) // we don't get any details here
}
return nil
}
func (r Request) observe(obs prometheus.Observer, value float64) {
metrics.WithExemplar(obs, value, r.GetRequestId(), r.GetTraceId())
}
func (r Request) observeParameterError(err *Error) *Error {
if err != nil {
r.g.metrics.ParameterErrorCounter.WithLabelValues(err.Code).Inc()
}
return err
}
func (r Request) observeJmapError(jerr jmap.Error) jmap.Error {
if jerr != nil {
r.g.metrics.JmapErrorCounter.WithLabelValues(r.session.JmapEndpoint, strconv.Itoa(jerr.Code())).Inc()
}
return jerr
}
@@ -0,0 +1,95 @@
package groupware
import (
"net/http"
"github.com/opencloud-eu/opencloud/pkg/jmap"
)
type Response struct {
body any
status int
err *Error
etag jmap.State
sessionState jmap.SessionState
}
func errorResponse(err *Error) Response {
return Response{
body: nil,
err: err,
etag: "",
sessionState: "",
}
}
func response(body any, sessionState jmap.SessionState) Response {
return Response{
body: body,
err: nil,
etag: jmap.State(sessionState),
sessionState: sessionState,
}
}
func etagResponse(body any, sessionState jmap.SessionState, etag jmap.State) Response {
return Response{
body: body,
err: nil,
etag: etag,
sessionState: sessionState,
}
}
func etagOnlyResponse(body any, etag jmap.State) Response {
return Response{
body: body,
err: nil,
etag: etag,
sessionState: "",
}
}
func noContentResponse(sessionState jmap.SessionState) Response {
return Response{
body: nil,
status: http.StatusNoContent,
err: nil,
etag: jmap.State(sessionState),
sessionState: sessionState,
}
}
/*
func acceptedResponse(sessionState jmap.SessionState) Response {
return Response{
body: nil,
status: http.StatusAccepted,
err: nil,
etag: jmap.State(sessionState),
sessionState: sessionState,
}
}
*/
/*
func timeoutResponse(sessionState jmap.SessionState) Response {
return Response{
body: nil,
status: http.StatusRequestTimeout,
err: nil,
etag: "",
sessionState: sessionState,
}
}
*/
func notFoundResponse(sessionState jmap.SessionState) Response {
return Response{
body: nil,
status: http.StatusNotFound,
err: nil,
etag: jmap.State(sessionState),
sessionState: sessionState,
}
}
@@ -5,7 +5,9 @@ import (
)
const (
UriParamAccount = "accountid"
defaultAccountId = "*"
UriParamAccountId = "accountid"
UriParamMailboxId = "mailbox"
UriParamMessageId = "messageid"
UriParamBlobId = "blobid"
@@ -1,6 +1,7 @@
package groupware
import (
"net/url"
"time"
"github.com/jellydator/ttlcache/v3"
@@ -69,17 +70,23 @@ func (s failedSession) Since() time.Time {
var _ cachedSession = failedSession{}
type sessionCacheLoader struct {
logger *log.Logger
jmapClient *jmap.Client
errorTtl time.Duration
logger *log.Logger
sessionUrlProvider func(username string) (*url.URL, *GroupwareError)
jmapClient *jmap.Client
errorTtl time.Duration
}
func (l *sessionCacheLoader) Load(c *ttlcache.Cache[sessionKey, cachedSession], key sessionKey) *ttlcache.Item[sessionKey, cachedSession] {
username := usernameFromSessionKey(key)
session, err := l.jmapClient.FetchSession(username, l.logger)
if err != nil {
l.logger.Warn().Str("username", username).Err(err).Msgf("failed to create session for '%v'", key)
return c.Set(key, failedSession{since: time.Now(), err: groupwareErrorFromJmap(err)}, l.errorTtl)
sessionUrl, gwerr := l.sessionUrlProvider(username)
if gwerr != nil {
l.logger.Warn().Str("username", username).Str("code", gwerr.Code).Msgf("failed to determine session URL for '%v'", key)
return c.Set(key, failedSession{since: time.Now(), err: gwerr}, l.errorTtl)
}
session, jerr := l.jmapClient.FetchSession(sessionUrl, username, l.logger)
if jerr != nil {
l.logger.Warn().Str("username", username).Err(jerr).Msgf("failed to create session for '%v'", key)
return c.Set(key, failedSession{since: time.Now(), err: groupwareErrorFromJmap(jerr)}, l.errorTtl)
} else {
l.logger.Debug().Str("username", username).Msgf("successfully created session for '%v'", key)
return c.Set(key, succeededSession{since: time.Now(), session: session}, ttlcache.DefaultTTL) // use the TTL configured on the Cache
+113 -16
View File
@@ -2,6 +2,7 @@ package metrics
import (
"reflect"
"strconv"
"github.com/opencloud-eu/opencloud/pkg/log"
"github.com/prometheus/client_golang/prometheus"
@@ -17,16 +18,27 @@ const (
// Metrics defines the available metrics of this service.
type Metrics struct {
SessionCacheDesc *prometheus.Desc
SessionCacheDesc *prometheus.Desc
EventBufferSizeDesc *prometheus.Desc
EventBufferQueuedOpts prometheus.GaugeOpts
SSESubscribersOpts prometheus.GaugeOpts
WorkersBufferSizeDesc *prometheus.Desc
WorkersBufferQueuedOpts prometheus.GaugeOpts
TotalWorkersDesc *prometheus.Desc
BusyWorkersOpts prometheus.GaugeOpts
JmapErrorCounter *prometheus.CounterVec
ParameterErrorCounter *prometheus.CounterVec
AuthenticationFailureCounter prometheus.Counter
SessionFailureCounter prometheus.Counter
SSEEventsCounter *prometheus.CounterVec
OutdatedSessionsCounter prometheus.Counter
SuccessfulRequestPerEndpointCounter *prometheus.CounterVec
FailedRequestPerEndpointCounter *prometheus.CounterVec
FailedRequestStatusPerEndpointCounter *prometheus.CounterVec
SuccessfulRequestPerEndpointCounter *prometheus.CounterVec
FailedRequestPerEndpointCounter *prometheus.CounterVec
FailedRequestStatusPerEndpointCounter *prometheus.CounterVec
ResponseBodyReadingErrorPerEndpointCounter *prometheus.CounterVec
ResponseBodyUnmarshallingErrorPerEndpointCounter *prometheus.CounterVec
EmailByIdDuration *prometheus.HistogramVec
EmailSameSenderDuration *prometheus.HistogramVec
@@ -92,7 +104,7 @@ var Values = struct {
}
// New initializes the available metrics.
func New(logger *log.Logger) *Metrics {
func New(registerer prometheus.Registerer, logger *log.Logger) *Metrics {
m := &Metrics{
SessionCacheDesc: prometheus.NewDesc(
prometheus.BuildFQName(Namespace, Subsystem, "session_cache"),
@@ -100,6 +112,48 @@ func New(logger *log.Logger) *Metrics {
[]string{Labels.SessionCacheType},
nil,
),
EventBufferSizeDesc: prometheus.NewDesc(
prometheus.BuildFQName(Namespace, Subsystem, "event_buffer_size"),
"Size of the buffer channel for server-sent events to process",
nil,
nil,
),
EventBufferQueuedOpts: prometheus.GaugeOpts{
Namespace: Namespace,
Subsystem: Subsystem,
Name: "event_buffer_queued",
Help: "Number of queued server-sent events",
},
SSESubscribersOpts: prometheus.GaugeOpts{
Namespace: Namespace,
Subsystem: Subsystem,
Name: "sse_subscribers",
Help: "Number of subscribers for server-sent event streams",
},
WorkersBufferSizeDesc: prometheus.NewDesc(
prometheus.BuildFQName(Namespace, Subsystem, "workers_buffer_size"),
"Size of the buffer channel for background worker jobs",
nil,
nil,
),
WorkersBufferQueuedOpts: prometheus.GaugeOpts{
Namespace: Namespace,
Subsystem: Subsystem,
Name: "workers_buffer_queued",
Help: "Number of queued background jobs",
},
TotalWorkersDesc: prometheus.NewDesc(
prometheus.BuildFQName(Namespace, Subsystem, "workers_total"),
"Total amount of background job workers",
nil,
nil,
),
BusyWorkersOpts: prometheus.GaugeOpts{
Namespace: Namespace,
Subsystem: Subsystem,
Name: "workers_busy",
Help: "Number of background job workers that are currently busy executing jobs",
},
AuthenticationFailureCounter: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: Namespace,
Subsystem: Subsystem,
@@ -148,12 +202,30 @@ func New(logger *log.Logger) *Metrics {
Name: "jmap_requests_failures_status_count",
Help: "Number of JMAP requests",
}, []string{Labels.Endpoint, Labels.HttpStatusCode}),
ResponseBodyReadingErrorPerEndpointCounter: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: Namespace,
Subsystem: Subsystem,
Name: "jmap_requests_body_reading_errors_count",
Help: "Number of JMAP body reading errors",
}, []string{Labels.Endpoint}),
ResponseBodyUnmarshallingErrorPerEndpointCounter: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: Namespace,
Subsystem: Subsystem,
Name: "jmap_requests_body_unmarshalling_errors_count",
Help: "Number of JMAP body unmarshalling errors",
}, []string{Labels.Endpoint}),
SSEEventsCounter: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: Namespace,
Subsystem: Subsystem,
Name: "sse_events_count",
Help: "Number of Server-Side Events that have been sent",
}, []string{Labels.SSEType}),
OutdatedSessionsCounter: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: Namespace,
Subsystem: Subsystem,
Name: "outdated_sessions_count",
Help: "Counts outdated session events",
}),
EmailByIdDuration: prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: Namespace,
Subsystem: Subsystem,
@@ -177,7 +249,7 @@ func New(logger *log.Logger) *Metrics {
}, []string{Labels.Endpoint}),
}
registerAll(m, logger)
registerAll(registerer, m, logger)
return m
}
@@ -186,23 +258,31 @@ func WithExemplar(obs prometheus.Observer, value float64, requestId string, trac
obs.(prometheus.ExemplarObserver).ObserveWithExemplar(value, prometheus.Labels{Labels.RequestId: requestId, Labels.TraceId: traceId})
}
func registerAll(m any, logger *log.Logger) {
func registerAll(registerer prometheus.Registerer, m any, logger *log.Logger) {
r := reflect.ValueOf(m)
if r.Kind() == reflect.Pointer {
r = r.Elem()
}
total := 0
succeeded := 0
failed := 0
for i := 0; i < r.NumField(); i++ {
n := r.Type().Field(i).Name
f := r.Field(i)
v := f.Interface()
c, ok := v.(prometheus.Collector)
if ok {
err := prometheus.Register(c)
total++
err := registerer.Register(c)
if err != nil {
failed++
logger.Warn().Err(err).Msgf("failed to register metric '%s' (%T)", n, c)
} else {
succeeded++
}
}
}
logger.Debug().Msgf("registered %d/%d metrics successfully (%d failed)", succeeded, total, failed)
}
type ConstMetricCollector struct {
@@ -221,26 +301,43 @@ type LoggingPrometheusRegisterer struct {
logger *log.Logger
}
func NewLoggingPrometheusRegisterer(delegate prometheus.Registerer, logger *log.Logger) LoggingPrometheusRegisterer {
return LoggingPrometheusRegisterer{
func NewLoggingPrometheusRegisterer(delegate prometheus.Registerer, logger *log.Logger) *LoggingPrometheusRegisterer {
return &LoggingPrometheusRegisterer{
delegate: delegate,
logger: logger,
}
}
func (r LoggingPrometheusRegisterer) Register(c prometheus.Collector) error {
func (r *LoggingPrometheusRegisterer) Register(c prometheus.Collector) error {
err := r.delegate.Register(c)
if err != nil {
r.logger.Warn().Err(err).Msgf("failed to register metric")
switch err.(type) {
case prometheus.AlreadyRegisteredError:
// silently ignore this error, as this case can happen when the suture service decides to restart
err = nil
default:
r.logger.Warn().Err(err).Msgf("failed to register metric")
}
}
return err
}
func (r LoggingPrometheusRegisterer) MustRegister(...prometheus.Collector) {
panic("don't use MustRegister")
func (r *LoggingPrometheusRegisterer) MustRegister(collectors ...prometheus.Collector) {
for _, c := range collectors {
r.Register(c)
}
}
func (r LoggingPrometheusRegisterer) Unregister(c prometheus.Collector) bool {
func (r *LoggingPrometheusRegisterer) Unregister(c prometheus.Collector) bool {
return r.delegate.Unregister(c)
}
var _ prometheus.Registerer = LoggingPrometheusRegisterer{}
var _ prometheus.Registerer = &LoggingPrometheusRegisterer{}
func Endpoint(endpoint string) prometheus.Labels {
return prometheus.Labels{Labels.Endpoint: endpoint}
}
func EndpointAndStatus(endpoint string, status int) prometheus.Labels {
return prometheus.Labels{Labels.Endpoint: endpoint, Labels.HttpStatusCode: strconv.Itoa(status)}
}
@@ -11,16 +11,29 @@ import (
func GroupwareLogger(logger log.Logger) func(http.Handler) http.Handler {
return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
level := logger.Debug()
if !level.Enabled() {
next.ServeHTTP(w, r)
return
}
start := time.Now()
wrap := middleware.NewWrapResponseWriter(w, r.ProtoMajor)
next.ServeHTTP(wrap, r)
level := logger.Debug()
err := recover()
if err != nil {
level = logger.Error()
}
if !level.Enabled() {
return
}
if err != nil {
switch e := err.(type) {
case error:
level = level.Err(e)
default:
level = level.Any("panic", e)
}
}
ctx := r.Context()
requestID := middleware.GetReqID(ctx)