diff --git a/services/antivirus/pkg/command/server.go b/services/antivirus/pkg/command/server.go index 257ab6c2b7..32e2fd8b44 100644 --- a/services/antivirus/pkg/command/server.go +++ b/services/antivirus/pkg/command/server.go @@ -9,6 +9,7 @@ import ( "github.com/owncloud/ocis/v2/ocis-pkg/handlers" "github.com/owncloud/ocis/v2/ocis-pkg/log" "github.com/owncloud/ocis/v2/ocis-pkg/service/debug" + "github.com/owncloud/ocis/v2/ocis-pkg/tracing" "github.com/owncloud/ocis/v2/ocis-pkg/version" "github.com/owncloud/ocis/v2/services/antivirus/pkg/config" "github.com/owncloud/ocis/v2/services/antivirus/pkg/config/parser" @@ -43,9 +44,12 @@ func Server(cfg *config.Config) *cli.Command { ) ) defer cancel() - + traceProvider, err := tracing.GetServiceTraceProvider(cfg.Tracing, cfg.Service.Name) + if err != nil { + return err + } { - svc, err := service.NewAntivirus(cfg, logger) + svc, err := service.NewAntivirus(cfg, logger, traceProvider) if err != nil { return err } diff --git a/services/antivirus/pkg/config/config.go b/services/antivirus/pkg/config/config.go index e884ad70a3..a5ab982efb 100644 --- a/services/antivirus/pkg/config/config.go +++ b/services/antivirus/pkg/config/config.go @@ -13,6 +13,8 @@ type Config struct { Service Service `yaml:"-"` + Tracing *Tracing `yaml:"tracing"` + InfectedFileHandling string `yaml:"infected-file-handling" env:"ANTIVIRUS_INFECTED_FILE_HANDLING" desc:"Defines the behaviour when a virus has been found. Supported options are: 'delete', 'continue' and 'abort '. Delete will delete the file. Continue will mark the file as infected but continues further processing. Abort will keep the file in the uploads folder for further admin inspection and will not move it to its final destination."` Events Events Scanner Scanner diff --git a/services/antivirus/pkg/config/defaults/defaultconfig.go b/services/antivirus/pkg/config/defaults/defaultconfig.go index 90bc649a34..75b761570a 100644 --- a/services/antivirus/pkg/config/defaults/defaultconfig.go +++ b/services/antivirus/pkg/config/defaults/defaultconfig.go @@ -46,9 +46,12 @@ func EnsureDefaults(cfg *config.Config) { if cfg.Log == nil { cfg.Log = &config.Log{} } + + if cfg.Tracing == nil { + cfg.Tracing = &config.Tracing{} + } } // Sanitize sanitizes the configuration func Sanitize(cfg *config.Config) { - } diff --git a/services/antivirus/pkg/config/tracing.go b/services/antivirus/pkg/config/tracing.go new file mode 100644 index 0000000000..dfa8bca314 --- /dev/null +++ b/services/antivirus/pkg/config/tracing.go @@ -0,0 +1,21 @@ +package config + +import "github.com/owncloud/ocis/v2/ocis-pkg/tracing" + +// Tracing defines the available tracing configuration. +type Tracing struct { + Enabled bool `yaml:"enabled" env:"OCIS_TRACING_ENABLED;ANTIVIRUS_TRACING_ENABLED" desc:"Activates tracing."` + Type string `yaml:"type" env:"OCIS_TRACING_TYPE;ANTIVIRUS_TRACING_TYPE" desc:"The type of tracing. Defaults to '', which is the same as 'jaeger'. Allowed tracing types are 'jaeger' and '' as of now."` + Endpoint string `yaml:"endpoint" env:"OCIS_TRACING_ENDPOINT;ANTIVIRUS_TRACING_ENDPOINT" desc:"The endpoint of the tracing agent."` + Collector string `yaml:"collector" env:"OCIS_TRACING_COLLECTOR;ANTIVIRUS_TRACING_COLLECTOR" desc:"The HTTP endpoint for sending spans directly to a collector, i.e. http://jaeger-collector:14268/api/traces. Only used if the tracing endpoint is unset."` +} + +// Convert Tracing to the tracing package's Config struct. +func (t Tracing) Convert() tracing.Config { + return tracing.Config{ + Enabled: t.Enabled, + Type: t.Type, + Endpoint: t.Endpoint, + Collector: t.Collector, + } +} diff --git a/services/antivirus/pkg/service/service.go b/services/antivirus/pkg/service/service.go index ac03dbcf99..948be0c77a 100644 --- a/services/antivirus/pkg/service/service.go +++ b/services/antivirus/pkg/service/service.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "crypto/x509" + "errors" "fmt" "io" "net/http" @@ -18,6 +19,14 @@ import ( "github.com/owncloud/ocis/v2/ocis-pkg/log" "github.com/owncloud/ocis/v2/services/antivirus/pkg/config" "github.com/owncloud/ocis/v2/services/antivirus/pkg/scanners" + "go.opentelemetry.io/otel/trace" +) + +var ( + // ErrFatal is returned when a fatal error occurs and we want to exit. + ErrFatal = errors.New("fatal error") + // ErrEvent is returned when something went wrong with a specific event. + ErrEvent = errors.New("event error") ) // Scanner is an abstraction for the actual virus scan @@ -26,8 +35,8 @@ type Scanner interface { } // NewAntivirus returns a service implementation for Service. -func NewAntivirus(c *config.Config, l log.Logger) (Antivirus, error) { - av := Antivirus{c: c, l: l, client: rhttp.GetHTTPClient(rhttp.Insecure(true))} +func NewAntivirus(c *config.Config, l log.Logger, tp trace.TracerProvider) (Antivirus, error) { + av := Antivirus{c: c, l: l, tp: tp, client: rhttp.GetHTTPClient(rhttp.Insecure(true))} var err error av.s, err = scanners.New(c) @@ -56,18 +65,18 @@ func NewAntivirus(c *config.Config, l log.Logger) (Antivirus, error) { // Antivirus defines implements the business logic for Service. type Antivirus struct { - c *config.Config - l log.Logger - s Scanner - o events.PostprocessingOutcome - m uint64 + c *config.Config + l log.Logger + s Scanner + o events.PostprocessingOutcome + m uint64 + tp trace.TracerProvider client *http.Client } // Run runs the service func (av Antivirus) Run() error { - ctx := context.Background() evtsCfg := av.c.Events var rootCAPool *x509.CertPool @@ -87,81 +96,102 @@ func (av Antivirus) Run() error { evtsCfg.TLSInsecure = false } - stream, err := stream.NatsFromConfig(av.c.Service.Name, stream.NatsConfig(av.c.Events)) + natsStream, err := stream.NatsFromConfig(av.c.Service.Name, stream.NatsConfig(av.c.Events)) if err != nil { return err } - ch, err := events.Consume(stream, "antivirus", events.StartPostprocessingStep{}) + ch, err := events.Consume(natsStream, "antivirus", events.StartPostprocessingStep{}) if err != nil { return err } for e := range ch { - ev := e.Event.(events.StartPostprocessingStep) - if ev.StepToStart != events.PPStepAntivirus { - continue - } - - if av.c.DebugScanOutcome != "" { - av.l.Warn().Str("antivir, clamav", ">>>>>>> ANTIVIRUS_DEBUG_SCAN_OUTCOME IS SET NO ACTUAL VIRUS SCAN IS PERFORMED!") - if err := events.Publish(ctx, stream, events.PostprocessingStepFinished{ - FinishedStep: events.PPStepAntivirus, - Outcome: events.PostprocessingOutcome(av.c.DebugScanOutcome), - UploadID: ev.UploadID, - ExecutingUser: ev.ExecutingUser, - Filename: ev.Filename, - Result: events.VirusscanResult{ - Infected: true, - Description: "DEBUG: forced outcome", - Scandate: time.Now(), - ResourceID: ev.ResourceID, - ErrorMsg: "DEBUG: forced outcome", - }, - }); err != nil { - av.l.Fatal().Err(err).Str("uploadid", ev.UploadID).Interface("resourceID", ev.ResourceID).Msg("cannot publish events - exiting") - return err - } - continue - } - - av.l.Debug().Str("uploadid", ev.UploadID).Str("filename", ev.Filename).Msg("Starting virus scan.") - var errmsg string - res, err := av.process(ev) + err := av.processEvent(e, natsStream) if err != nil { - errmsg = err.Error() + switch { + case errors.Is(err, ErrFatal): + return err + case errors.Is(err, ErrEvent): + // Right now logging of these happens in the processEvent method, might be cleaner to do it here. + continue + default: + av.l.Fatal().Err(err).Msg("unknown error - exiting") + } } - var outcome events.PostprocessingOutcome - switch { - case res.Infected: - outcome = av.o - case !res.Infected && err == nil: - outcome = events.PPOutcomeContinue - default: - outcome = events.PPOutcomeAbort - } + } - av.l.Info().Str("uploadid", ev.UploadID).Interface("resourceID", ev.ResourceID).Str("virus", res.Description).Str("outcome", string(outcome)).Str("filename", ev.Filename).Str("user", ev.ExecutingUser.GetId().GetOpaqueId()).Bool("infected", res.Infected).Msg("File scanned") - if err := events.Publish(ctx, stream, events.PostprocessingStepFinished{ + return nil +} + +func (av Antivirus) processEvent(e events.Event, s events.Publisher) error { + ctx := e.GetTraceContext(context.Background()) + ctx, span := av.tp.Tracer("antivirus").Start(ctx, "processEvent") + defer span.End() + av.l.Info().Str("traceID", span.SpanContext().TraceID().String()).Msg("TraceID") + ev := e.Event.(events.StartPostprocessingStep) + if ev.StepToStart != events.PPStepAntivirus { + return nil + } + + if av.c.DebugScanOutcome != "" { + av.l.Warn().Str("antivir, clamav", ">>>>>>> ANTIVIRUS_DEBUG_SCAN_OUTCOME IS SET NO ACTUAL VIRUS SCAN IS PERFORMED!") + if err := events.Publish(ctx, s, events.PostprocessingStepFinished{ FinishedStep: events.PPStepAntivirus, - Outcome: outcome, + Outcome: events.PostprocessingOutcome(av.c.DebugScanOutcome), UploadID: ev.UploadID, ExecutingUser: ev.ExecutingUser, Filename: ev.Filename, Result: events.VirusscanResult{ - Infected: res.Infected, - Description: res.Description, + Infected: true, + Description: "DEBUG: forced outcome", Scandate: time.Now(), ResourceID: ev.ResourceID, - ErrorMsg: errmsg, + ErrorMsg: "DEBUG: forced outcome", }, }); err != nil { av.l.Fatal().Err(err).Str("uploadid", ev.UploadID).Interface("resourceID", ev.ResourceID).Msg("cannot publish events - exiting") - return err + return fmt.Errorf("%w: cannot publish events", ErrFatal) } + return fmt.Errorf("%w: no actual virus scan performed", ErrEvent) } + av.l.Debug().Str("uploadid", ev.UploadID).Str("filename", ev.Filename).Msg("Starting virus scan.") + var errmsg string + res, err := av.process(ev) + if err != nil { + errmsg = err.Error() + } + + var outcome events.PostprocessingOutcome + switch { + case res.Infected: + outcome = av.o + case !res.Infected && err == nil: + outcome = events.PPOutcomeContinue + default: + outcome = events.PPOutcomeAbort + } + + av.l.Info().Str("uploadid", ev.UploadID).Interface("resourceID", ev.ResourceID).Str("virus", res.Description).Str("outcome", string(outcome)).Str("filename", ev.Filename).Str("user", ev.ExecutingUser.GetId().GetOpaqueId()).Bool("infected", res.Infected).Msg("File scanned") + if err := events.Publish(ctx, s, events.PostprocessingStepFinished{ + FinishedStep: events.PPStepAntivirus, + Outcome: outcome, + UploadID: ev.UploadID, + ExecutingUser: ev.ExecutingUser, + Filename: ev.Filename, + Result: events.VirusscanResult{ + Infected: res.Infected, + Description: res.Description, + Scandate: time.Now(), + ResourceID: ev.ResourceID, + ErrorMsg: errmsg, + }, + }); err != nil { + av.l.Fatal().Err(err).Str("uploadid", ev.UploadID).Interface("resourceID", ev.ResourceID).Msg("cannot publish events - exiting") + return fmt.Errorf("%w: %s", ErrFatal, err) + } return nil } diff --git a/services/postprocessing/pkg/service/service.go b/services/postprocessing/pkg/service/service.go index 2f261dfca1..01c264dbd8 100644 --- a/services/postprocessing/pkg/service/service.go +++ b/services/postprocessing/pkg/service/service.go @@ -55,6 +55,8 @@ func (pps *PostprocessingService) Run() error { err error ) + ctx = e.GetTraceContext(ctx) + switch ev := e.Event.(type) { case events.BytesReceived: pp = postprocessing.New(ev.UploadID, ev.URL, ev.ExecutingUser, ev.Filename, ev.Filesize, ev.ResourceID, pps.steps, pps.c.Delayprocessing)