Add tracing to postprocessing service (#7094)

This adds tracing to the postprocessing service.
This commit is contained in:
Daniël Franke
2023-08-23 15:52:26 +02:00
committed by GitHub
parent 49fc22d532
commit 97034f4aaa
4 changed files with 120 additions and 73 deletions

View File

@@ -10,6 +10,7 @@ import (
"github.com/oklog/run"
"github.com/owncloud/ocis/v2/ocis-pkg/handlers"
"github.com/owncloud/ocis/v2/ocis-pkg/service/debug"
"github.com/owncloud/ocis/v2/ocis-pkg/tracing"
"github.com/owncloud/ocis/v2/ocis-pkg/version"
"github.com/owncloud/ocis/v2/services/postprocessing/pkg/config"
"github.com/owncloud/ocis/v2/services/postprocessing/pkg/config/parser"
@@ -47,6 +48,11 @@ func Server(cfg *config.Config) *cli.Command {
)
defer cancel()
traceProvider, err := tracing.GetServiceTraceProvider(cfg.Tracing, cfg.Service.Name)
if err != nil {
return err
}
{
bus, err := stream.NatsFromConfig(cfg.Service.Name, stream.NatsConfig(cfg.Postprocessing.Events))
if err != nil {
@@ -62,7 +68,7 @@ func Server(cfg *config.Config) *cli.Command {
microstore.Table(cfg.Store.Table),
)
svc, err := service.NewPostprocessingService(bus, logger, st, cfg.Postprocessing)
svc, err := service.NewPostprocessingService(ctx, bus, logger, st, traceProvider, cfg.Postprocessing)
if err != nil {
return err
}

View File

@@ -48,14 +48,6 @@ type Debug struct {
Zpages bool `yaml:"zpages" env:"POSTPROCESSING_DEBUG_ZPAGES" desc:"Enables zpages, which can be used for collecting and viewing in-memory traces."`
}
// Tracing defines the available tracing configuration.
type Tracing struct {
Enabled bool `yaml:"enabled" env:"OCIS_TRACING_ENABLED;POSTPROCESSING_TRACING_ENABLED" desc:"Activates tracing."`
Type string `yaml:"type" env:"OCIS_TRACING_TYPE;POSTPROCESSING_TRACING_TYPE" desc:"The type of tracing. Defaults to '', which is the same as 'jaeger'. Allowed tracing types are 'jaeger' and '' as of now."`
Endpoint string `yaml:"endpoint" env:"OCIS_TRACING_ENDPOINT;POSTPROCESSING_TRACING_ENDPOINT" desc:"The endpoint of the tracing agent."`
Collector string `yaml:"collector" env:"OCIS_TRACING_COLLECTOR;POSTPROCESSING_TRACING_COLLECTOR" desc:"The HTTP endpoint for sending spans directly to a collector, i.e. http://jaeger-collector:14268/api/traces. Only used if the tracing endpoint is unset."`
}
// Store configures the store to use
type Store struct {
Store string `yaml:"store" env:"OCIS_PERSISTENT_STORE;POSTPROCESSING_STORE" desc:"The type of the store. Supported values are: 'memory', 'ocmem', 'etcd', 'redis', 'redis-sentinel', 'nats-js', 'noop'. See the text description for details."`

View File

@@ -0,0 +1,21 @@
package config
import "github.com/owncloud/ocis/v2/ocis-pkg/tracing"
// Tracing defines the available tracing configuration.
type Tracing struct {
Enabled bool `yaml:"enabled" env:"OCIS_TRACING_ENABLED;POSTPROCESSING_TRACING_ENABLED" desc:"Activates tracing."`
Type string `yaml:"type" env:"OCIS_TRACING_TYPE;POSTPROCESSING_TRACING_TYPE" desc:"The type of tracing. Defaults to '', which is the same as 'jaeger'. Allowed tracing types are 'jaeger' and '' as of now."`
Endpoint string `yaml:"endpoint" env:"OCIS_TRACING_ENDPOINT;POSTPROCESSING_TRACING_ENDPOINT" desc:"The endpoint of the tracing agent."`
Collector string `yaml:"collector" env:"OCIS_TRACING_COLLECTOR;POSTPROCESSING_TRACING_COLLECTOR" desc:"The HTTP endpoint for sending spans directly to a collector, i.e. http://jaeger-collector:14268/api/traces. Only used if the tracing endpoint is unset."`
}
// Convert Tracing to the tracing package's Config struct.
func (t Tracing) Convert() tracing.Config {
return tracing.Config{
Enabled: t.Enabled,
Type: t.Type,
Endpoint: t.Endpoint,
Collector: t.Collector,
}
}

View File

@@ -3,6 +3,7 @@ package service
import (
"context"
"encoding/json"
"errors"
"fmt"
"github.com/cs3org/reva/v2/pkg/events"
@@ -10,20 +11,30 @@ import (
"github.com/owncloud/ocis/v2/services/postprocessing/pkg/config"
"github.com/owncloud/ocis/v2/services/postprocessing/pkg/postprocessing"
"go-micro.dev/v4/store"
"go.opentelemetry.io/otel/trace"
)
// PostprocessingService is an instance of the service handling postprocessing of files
type PostprocessingService struct {
ctx context.Context
log log.Logger
events <-chan events.Event
pub events.Publisher
steps []events.Postprocessingstep
store store.Store
c config.Postprocessing
tp trace.TracerProvider
}
var (
// errFatal is returned when a fatal error occurs and we want to exit.
errFatal = errors.New("fatal error")
// ErrEvent is returned when something went wrong with a specific event.
errEvent = errors.New("event error")
)
// NewPostprocessingService returns a new instance of a postprocessing service
func NewPostprocessingService(stream events.Stream, logger log.Logger, sto store.Store, c config.Postprocessing) (*PostprocessingService, error) {
func NewPostprocessingService(ctx context.Context, stream events.Stream, logger log.Logger, sto store.Store, tp trace.TracerProvider, c config.Postprocessing) (*PostprocessingService, error) {
evs, err := events.Consume(stream, "postprocessing",
events.BytesReceived{},
events.StartPostprocessingStep{},
@@ -36,89 +47,106 @@ func NewPostprocessingService(stream events.Stream, logger log.Logger, sto store
}
return &PostprocessingService{
ctx: ctx,
log: logger,
events: evs,
pub: stream,
steps: getSteps(c),
store: sto,
c: c,
tp: tp,
}, nil
}
// Run to fulfil Runner interface
func (pps *PostprocessingService) Run() error {
ctx := context.Background()
for e := range pps.events {
var (
next interface{}
pp *postprocessing.Postprocessing
err error
)
err := pps.processEvent(e)
if err != nil {
switch {
case errors.Is(err, errFatal):
return err
case errors.Is(err, errEvent):
continue
default:
pps.log.Fatal().Err(err).Msg("unknown error - exiting")
}
}
}
return nil
}
ctx = e.GetTraceContext(ctx)
func (pps *PostprocessingService) processEvent(e events.Event) error {
var (
next interface{}
pp *postprocessing.Postprocessing
err error
)
switch ev := e.Event.(type) {
case events.BytesReceived:
pp = postprocessing.New(ev.UploadID, ev.URL, ev.ExecutingUser, ev.Filename, ev.Filesize, ev.ResourceID, pps.steps, pps.c.Delayprocessing)
next = pp.Init(ev)
case events.PostprocessingStepFinished:
if ev.UploadID == "" {
// no current upload - this was an on demand scan
continue
}
pp, err = getPP(pps.store, ev.UploadID)
if err != nil {
pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot get upload")
continue
}
next = pp.NextStep(ev)
case events.StartPostprocessingStep:
if ev.StepToStart != events.PPStepDelay {
continue
}
pp, err = getPP(pps.store, ev.UploadID)
if err != nil {
pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot get upload")
continue
}
next = pp.Delay(ev)
case events.UploadReady:
// the storage provider thinks the upload is done - so no need to keep it any more
if err := pps.store.Delete(ev.UploadID); err != nil {
pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot delete upload")
continue
}
case events.ResumePostprocessing:
pp, err = getPP(pps.store, ev.UploadID)
if err != nil {
if err == store.ErrNotFound {
if err := events.Publish(ctx, pps.pub, events.RestartPostprocessing{
UploadID: ev.UploadID,
Timestamp: ev.Timestamp,
}); err != nil {
pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot publish RestartPostprocessing event")
}
continue
ctx := e.GetTraceContext(pps.ctx)
ctx, span := pps.tp.Tracer("postprocessing").Start(ctx, "processEvent")
defer span.End()
switch ev := e.Event.(type) {
case events.BytesReceived:
pp = postprocessing.New(ev.UploadID, ev.URL, ev.ExecutingUser, ev.Filename, ev.Filesize, ev.ResourceID, pps.steps, pps.c.Delayprocessing)
next = pp.Init(ev)
case events.PostprocessingStepFinished:
if ev.UploadID == "" {
// no current upload - this was an on demand scan
return nil
}
pp, err = getPP(pps.store, ev.UploadID)
if err != nil {
pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot get upload")
return fmt.Errorf("%w: cannot get upload", errEvent)
}
next = pp.NextStep(ev)
case events.StartPostprocessingStep:
if ev.StepToStart != events.PPStepDelay {
return nil
}
pp, err = getPP(pps.store, ev.UploadID)
if err != nil {
pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot get upload")
return fmt.Errorf("%w: cannot get upload", errEvent)
}
next = pp.Delay(ev)
case events.UploadReady:
// the storage provider thinks the upload is done - so no need to keep it any more
if err := pps.store.Delete(ev.UploadID); err != nil {
pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot delete upload")
return fmt.Errorf("%w: cannot delete upload", errEvent)
}
case events.ResumePostprocessing:
pp, err = getPP(pps.store, ev.UploadID)
if err != nil {
if err == store.ErrNotFound {
if err := events.Publish(ctx, pps.pub, events.RestartPostprocessing{
UploadID: ev.UploadID,
Timestamp: ev.Timestamp,
}); err != nil {
pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot publish RestartPostprocessing event")
}
pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot get upload")
continue
return fmt.Errorf("%w: cannot publish RestartPostprocessing event", errEvent)
}
next = pp.CurrentStep()
pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot get upload")
return fmt.Errorf("%w: cannot get upload", errEvent)
}
next = pp.CurrentStep()
}
if pp != nil {
if err := storePP(pps.store, pp); err != nil {
pps.log.Error().Str("uploadID", pp.ID).Err(err).Msg("cannot store upload")
continue // TODO: should we really continue here?
}
if pp != nil {
if err := storePP(pps.store, pp); err != nil {
pps.log.Error().Str("uploadID", pp.ID).Err(err).Msg("cannot store upload")
return fmt.Errorf("%w: cannot store upload", errEvent)
}
if next != nil {
if err := events.Publish(ctx, pps.pub, next); err != nil {
pps.log.Error().Err(err).Msg("unable to publish event")
return err // we can't publish -> we are screwed
}
}
if next != nil {
if err := events.Publish(ctx, pps.pub, next); err != nil {
pps.log.Error().Err(err).Msg("unable to publish event")
return fmt.Errorf("%w: unable to publish event", errFatal) // we can't publish -> we are screwed
}
}
return nil
}