add userlog tracing (#6772)

Signed-off-by: Jörn Friedrich Dreyer <jfd@butonic.de>
This commit is contained in:
Jörn Friedrich Dreyer
2023-07-12 10:57:40 +02:00
committed by GitHub
parent cf0dbfffc3
commit 138754749a
7 changed files with 92 additions and 28 deletions

View File

@@ -14,6 +14,7 @@ import (
"github.com/owncloud/ocis/v2/ocis-pkg/registry"
"github.com/owncloud/ocis/v2/ocis-pkg/service/debug"
ogrpc "github.com/owncloud/ocis/v2/ocis-pkg/service/grpc"
"github.com/owncloud/ocis/v2/ocis-pkg/tracing"
"github.com/owncloud/ocis/v2/ocis-pkg/version"
ehsvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/eventhistory/v0"
settingssvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/settings/v0"
@@ -55,8 +56,14 @@ func Server(cfg *config.Config) *cli.Command {
},
Action: func(c *cli.Context) error {
logger := logging.Configure(cfg.Service.Name, cfg.Log)
tracerProvider, err := tracing.GetServiceTraceProvider(cfg.Tracing, cfg.Service.Name)
if err != nil {
return err
}
err := ogrpc.Configure(ogrpc.GetClientOptions(cfg.GRPCClientTLS)...)
err = ogrpc.Configure(
append(ogrpc.GetClientOptions(cfg.GRPCClientTLS), ogrpc.WithTraceProvider(tracerProvider))...,
)
if err != nil {
return err
}
@@ -97,6 +104,7 @@ func Server(cfg *config.Config) *cli.Command {
pool.WithTLSCACert(cfg.GRPCClientTLS.CACert),
pool.WithTLSMode(tm),
pool.WithRegistry(registry.GetRegistry()),
pool.WithTracerProvider(tracerProvider),
)
if err != nil {
return fmt.Errorf("could not get reva client selector: %s", err)
@@ -119,6 +127,7 @@ func Server(cfg *config.Config) *cli.Command {
http.Value(vClient),
http.Role(rClient),
http.RegisteredEvents(_registeredEvents),
http.TracerProvider(tracerProvider),
)
if err != nil {

View File

@@ -73,11 +73,3 @@ type HTTP struct {
type TokenManager struct {
JWTSecret string `yaml:"jwt_secret" env:"OCIS_JWT_SECRET;USERLOG_JWT_SECRET" desc:"The secret to mint and validate jwt tokens."`
}
// Tracing defines the available tracing configuration.
type Tracing struct {
Enabled bool `yaml:"enabled" env:"OCIS_TRACING_ENABLED;USERLOG_TRACING_ENABLED" desc:"Activates tracing."`
Type string `yaml:"type" env:"OCIS_TRACING_TYPE;USERLOG_TRACING_TYPE" desc:"The type of tracing. Defaults to '', which is the same as 'jaeger'. Allowed tracing types are 'jaeger' and '' as of now."`
Endpoint string `yaml:"endpoint" env:"OCIS_TRACING_ENDPOINT;USERLOG_TRACING_ENDPOINT" desc:"The endpoint of the tracing agent."`
Collector string `yaml:"collector" env:"OCIS_TRACING_COLLECTOR;USERLOG_TRACING_COLLECTOR" desc:"The HTTP endpoint for sending spans directly to a collector, i.e. http://jaeger-collector:14268/api/traces. Only used if the tracing endpoint is unset."`
}

View File

@@ -0,0 +1,21 @@
package config
import "github.com/owncloud/ocis/v2/ocis-pkg/tracing"
// Tracing defines the available tracing configuration.
type Tracing struct {
Enabled bool `yaml:"enabled" env:"OCIS_TRACING_ENABLED;USERLOG_TRACING_ENABLED" desc:"Activates tracing."`
Type string `yaml:"type" env:"OCIS_TRACING_TYPE;USERLOG_TRACING_TYPE" desc:"The type of tracing. Defaults to '', which is the same as 'jaeger'. Allowed tracing types are 'jaeger' and '' as of now."`
Endpoint string `yaml:"endpoint" env:"OCIS_TRACING_ENDPOINT;USERLOG_TRACING_ENDPOINT" desc:"The endpoint of the tracing agent."`
Collector string `yaml:"collector" env:"OCIS_TRACING_COLLECTOR;USERLOG_TRACING_COLLECTOR" desc:"The HTTP endpoint for sending spans directly to a collector, i.e. http://jaeger-collector:14268/api/traces. Only used if the tracing endpoint is unset."`
}
// Convert Tracing to the tracing package's Config struct.
func (t Tracing) Convert() tracing.Config {
return tracing.Config{
Enabled: t.Enabled,
Type: t.Type,
Endpoint: t.Endpoint,
Collector: t.Collector,
}
}

View File

@@ -13,6 +13,7 @@ import (
"github.com/owncloud/ocis/v2/services/userlog/pkg/metrics"
"github.com/urfave/cli/v2"
"go-micro.dev/v4/store"
"go.opentelemetry.io/otel/trace"
)
// Option defines a single option function.
@@ -33,6 +34,7 @@ type Options struct {
ValueClient settingssvc.ValueService
RoleClient settingssvc.RoleService
RegisteredEvents []events.Unmarshaller
TracerProvider trace.TracerProvider
}
// newOptions initializes the available default options.
@@ -136,3 +138,10 @@ func Role(rs settingssvc.RoleService) Option {
o.RoleClient = rs
}
}
// TracerProvider provides a function to set the TracerProvider option
func TracerProvider(val trace.TracerProvider) Option {
return func(o *Options) {
o.TracerProvider = val
}
}

View File

@@ -11,8 +11,10 @@ import (
"github.com/owncloud/ocis/v2/ocis-pkg/cors"
"github.com/owncloud/ocis/v2/ocis-pkg/middleware"
"github.com/owncloud/ocis/v2/ocis-pkg/service/http"
"github.com/owncloud/ocis/v2/ocis-pkg/tracing"
"github.com/owncloud/ocis/v2/ocis-pkg/version"
svc "github.com/owncloud/ocis/v2/services/userlog/pkg/service"
"github.com/riandyrn/otelchi"
"go-micro.dev/v4"
)
@@ -42,7 +44,6 @@ func Server(opts ...Option) (http.Service, error) {
}
middlewares := []func(stdhttp.Handler) stdhttp.Handler{
middleware.TraceContext,
chimiddleware.RequestID,
middleware.Version(
"userlog",
@@ -68,6 +69,15 @@ func Server(opts ...Option) (http.Service, error) {
mux := chi.NewMux()
mux.Use(middlewares...)
mux.Use(
otelchi.Middleware(
"userlog",
otelchi.WithChiRoutes(mux),
otelchi.WithTracerProvider(options.TracerProvider),
otelchi.WithPropagators(tracing.GetPropagator()),
),
)
handle, err := svc.NewUserlogService(
svc.Logger(options.Logger),
svc.Consumer(options.Consumer),

View File

@@ -4,12 +4,12 @@ import (
"encoding/json"
"net/http"
"github.com/cs3org/reva/v2/pkg/ctx"
revactx "github.com/cs3org/reva/v2/pkg/ctx"
"github.com/owncloud/ocis/v2/ocis-pkg/log"
"github.com/owncloud/ocis/v2/ocis-pkg/roles"
"github.com/owncloud/ocis/v2/services/graph/pkg/service/v0/errorcode"
settings "github.com/owncloud/ocis/v2/services/settings/pkg/service/v0"
"go.opentelemetry.io/otel/attribute"
)
// HeaderAcceptLanguage is the header where the client can set the locale
@@ -22,19 +22,25 @@ func (ul *UserlogService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// HandleGetEvents is the GET handler for events
func (ul *UserlogService) HandleGetEvents(w http.ResponseWriter, r *http.Request) {
u, ok := revactx.ContextGetUser(r.Context())
ctx, span := tracer.Start(r.Context(), "HandleGetEvents")
defer span.End()
u, ok := revactx.ContextGetUser(ctx)
if !ok {
ul.log.Error().Int("returned statuscode", http.StatusUnauthorized).Msg("user unauthorized")
w.WriteHeader(http.StatusUnauthorized)
return
}
evs, err := ul.GetEvents(r.Context(), u.GetId().GetOpaqueId())
evs, err := ul.GetEvents(ctx, u.GetId().GetOpaqueId())
if err != nil {
ul.log.Error().Err(err).Int("returned statuscode", http.StatusInternalServerError).Msg("get events failed")
w.WriteHeader(http.StatusInternalServerError)
return
}
span.SetAttributes(attribute.KeyValue{
Key: "events",
Value: attribute.IntValue(len(evs)),
})
conv := ul.getConverter(r.Header.Get(HeaderAcceptLanguage))
@@ -61,7 +67,7 @@ func (ul *UserlogService) HandleGetEvents(w http.ResponseWriter, r *http.Request
resp.OCS.Data = append(resp.OCS.Data, noti)
}
glevs, err := ul.GetGlobalEvents()
glevs, err := ul.GetGlobalEvents(ctx)
if err != nil {
ul.log.Error().Err(err).Int("returned statuscode", http.StatusInternalServerError).Msg("get global events failed")
w.WriteHeader(http.StatusInternalServerError)
@@ -85,7 +91,7 @@ func (ul *UserlogService) HandleGetEvents(w http.ResponseWriter, r *http.Request
// HandleSSE is the GET handler for events
func (ul *UserlogService) HandleSSE(w http.ResponseWriter, r *http.Request) {
u, ok := ctx.ContextGetUser(r.Context())
u, ok := revactx.ContextGetUser(r.Context())
if !ok {
ul.log.Error().Msg("sse: no user in context")
w.WriteHeader(http.StatusInternalServerError)
@@ -119,7 +125,7 @@ func (ul *UserlogService) HandlePostGlobalEvent(w http.ResponseWriter, r *http.R
return
}
if err := ul.StoreGlobalEvent(req.Type, req.Data); err != nil {
if err := ul.StoreGlobalEvent(r.Context(), req.Type, req.Data); err != nil {
ul.log.Error().Err(err).Msg("post: error storing global event")
w.WriteHeader(http.StatusInternalServerError)
return
@@ -137,7 +143,7 @@ func (ul *UserlogService) HandleDeleteGlobalEvent(w http.ResponseWriter, r *http
return
}
if err := ul.DeleteGlobalEvents(req.IDs); err != nil {
if err := ul.DeleteGlobalEvents(r.Context(), req.IDs); err != nil {
ul.log.Error().Err(err).Int("returned statuscode", http.StatusInternalServerError).Msg("delete events failed")
w.WriteHeader(http.StatusInternalServerError)
return

View File

@@ -29,9 +29,17 @@ import (
"github.com/r3labs/sse/v2"
micrometadata "go-micro.dev/v4/metadata"
"go-micro.dev/v4/store"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc/metadata"
)
var tracer trace.Tracer
func init() {
tracer = otel.Tracer("github.com/owncloud/ocis/services/userlog/pkg/service")
}
// UserlogService is the service responsible for user activities
type UserlogService struct {
log log.Logger
@@ -43,7 +51,6 @@ type UserlogService struct {
valueClient settingssvc.ValueService
sse *sse.Server
registeredEvents map[string]events.Unmarshaller
translationPath string
}
// NewUserlogService returns an EventHistory service
@@ -54,7 +61,7 @@ func NewUserlogService(opts ...Option) (*UserlogService, error) {
}
if o.Consumer == nil || o.Store == nil {
return nil, fmt.Errorf("Need non nil consumer (%v) and store (%v) to work properly", o.Consumer, o.Store)
return nil, fmt.Errorf("need non nil consumer (%v) and store (%v) to work properly", o.Consumer, o.Store)
}
ch, err := events.Consume(o.Consumer, "userlog", o.RegisteredEvents...)
@@ -191,6 +198,8 @@ func (ul *UserlogService) MemorizeEvents(ch <-chan events.Event) {
// GetEvents allows retrieving events from the eventhistory by userid
func (ul *UserlogService) GetEvents(ctx context.Context, userid string) ([]*ehmsg.Event, error) {
ctx, span := tracer.Start(ctx, "GetEvents")
defer span.End()
rec, err := ul.store.Read(userid)
if err != nil && err != store.ErrNotFound {
ul.log.Error().Err(err).Str("userid", userid).Msg("failed to read record from store")
@@ -246,7 +255,9 @@ func (ul *UserlogService) DeleteEvents(userid string, evids []string) error {
}
// StoreGlobalEvent will store a global event that will be returned with each `GetEvents` request
func (ul *UserlogService) StoreGlobalEvent(typ string, data map[string]string) error {
func (ul *UserlogService) StoreGlobalEvent(ctx context.Context, typ string, data map[string]string) error {
ctx, span := tracer.Start(ctx, "StoreGlobalEvent")
defer span.End()
switch typ {
default:
return fmt.Errorf("unknown event type: %s", typ)
@@ -277,7 +288,7 @@ func (ul *UserlogService) StoreGlobalEvent(typ string, data map[string]string) e
return err
}
return ul.alterGlobalEvents(func(evs map[string]json.RawMessage) error {
return ul.alterGlobalEvents(ctx, func(evs map[string]json.RawMessage) error {
evs[typ] = b
return nil
})
@@ -286,7 +297,9 @@ func (ul *UserlogService) StoreGlobalEvent(typ string, data map[string]string) e
}
// GetGlobalEvents will return all global events
func (ul *UserlogService) GetGlobalEvents() (map[string]json.RawMessage, error) {
func (ul *UserlogService) GetGlobalEvents(ctx context.Context) (map[string]json.RawMessage, error) {
_, span := tracer.Start(ctx, "GetGlobalEvents")
defer span.End()
out := make(map[string]json.RawMessage)
recs, err := ul.store.Read(_globalEventsKey)
@@ -304,8 +317,10 @@ func (ul *UserlogService) GetGlobalEvents() (map[string]json.RawMessage, error)
}
// DeleteGlobalEvents will delete the specified event
func (ul *UserlogService) DeleteGlobalEvents(evnames []string) error {
return ul.alterGlobalEvents(func(evs map[string]json.RawMessage) error {
func (ul *UserlogService) DeleteGlobalEvents(ctx context.Context, evnames []string) error {
_, span := tracer.Start(ctx, "DeleteGlobalEvents")
defer span.End()
return ul.alterGlobalEvents(ctx, func(evs map[string]json.RawMessage) error {
for _, name := range evnames {
delete(evs, name)
}
@@ -390,8 +405,10 @@ func (ul *UserlogService) alterUserEventList(userid string, alter func([]string)
})
}
func (ul *UserlogService) alterGlobalEvents(alter func(map[string]json.RawMessage) error) error {
evs, err := ul.GetGlobalEvents()
func (ul *UserlogService) alterGlobalEvents(ctx context.Context, alter func(map[string]json.RawMessage) error) error {
_, span := tracer.Start(ctx, "alterGlobalEvents")
defer span.End()
evs, err := ul.GetGlobalEvents(ctx)
if err != nil && err != store.ErrNotFound {
return err
}
@@ -587,7 +604,7 @@ func getSpace(ctx context.Context, spaceID string, gatewaySelector pool.Selectab
}
if res.GetStatus().GetCode() != rpc.Code_CODE_OK {
return nil, fmt.Errorf("Error while getting space: (%v) %s", res.GetStatus().GetCode(), res.GetStatus().GetMessage())
return nil, fmt.Errorf("error while getting space: (%v) %s", res.GetStatus().GetCode(), res.GetStatus().GetMessage())
}
if len(res.StorageSpaces) == 0 {
@@ -647,7 +664,7 @@ func getResource(ctx context.Context, resourceid *storageprovider.ResourceId, ga
}
if res.GetStatus().GetCode() != rpc.Code_CODE_OK {
return nil, fmt.Errorf("Unexpected status code while getting space: %v", res.GetStatus().GetCode())
return nil, fmt.Errorf("unexpected status code while getting space: %v", res.GetStatus().GetCode())
}
return res.GetInfo(), nil