mirror of
https://github.com/opencloud-eu/opencloud.git
synced 2026-01-07 04:40:05 -06:00
Merge pull request #7079 from owncloud/ainmosni/tracing/av
Add tracing to antivirus.
This commit is contained in:
@@ -9,6 +9,7 @@ import (
|
||||
"github.com/owncloud/ocis/v2/ocis-pkg/handlers"
|
||||
"github.com/owncloud/ocis/v2/ocis-pkg/log"
|
||||
"github.com/owncloud/ocis/v2/ocis-pkg/service/debug"
|
||||
"github.com/owncloud/ocis/v2/ocis-pkg/tracing"
|
||||
"github.com/owncloud/ocis/v2/ocis-pkg/version"
|
||||
"github.com/owncloud/ocis/v2/services/antivirus/pkg/config"
|
||||
"github.com/owncloud/ocis/v2/services/antivirus/pkg/config/parser"
|
||||
@@ -43,9 +44,12 @@ func Server(cfg *config.Config) *cli.Command {
|
||||
)
|
||||
)
|
||||
defer cancel()
|
||||
|
||||
traceProvider, err := tracing.GetServiceTraceProvider(cfg.Tracing, cfg.Service.Name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
{
|
||||
svc, err := service.NewAntivirus(cfg, logger)
|
||||
svc, err := service.NewAntivirus(cfg, logger, traceProvider)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -13,6 +13,8 @@ type Config struct {
|
||||
|
||||
Service Service `yaml:"-"`
|
||||
|
||||
Tracing *Tracing `yaml:"tracing"`
|
||||
|
||||
InfectedFileHandling string `yaml:"infected-file-handling" env:"ANTIVIRUS_INFECTED_FILE_HANDLING" desc:"Defines the behaviour when a virus has been found. Supported options are: 'delete', 'continue' and 'abort '. Delete will delete the file. Continue will mark the file as infected but continues further processing. Abort will keep the file in the uploads folder for further admin inspection and will not move it to its final destination."`
|
||||
Events Events
|
||||
Scanner Scanner
|
||||
|
||||
@@ -46,9 +46,12 @@ func EnsureDefaults(cfg *config.Config) {
|
||||
if cfg.Log == nil {
|
||||
cfg.Log = &config.Log{}
|
||||
}
|
||||
|
||||
if cfg.Tracing == nil {
|
||||
cfg.Tracing = &config.Tracing{}
|
||||
}
|
||||
}
|
||||
|
||||
// Sanitize sanitizes the configuration
|
||||
func Sanitize(cfg *config.Config) {
|
||||
|
||||
}
|
||||
|
||||
21
services/antivirus/pkg/config/tracing.go
Normal file
21
services/antivirus/pkg/config/tracing.go
Normal file
@@ -0,0 +1,21 @@
|
||||
package config
|
||||
|
||||
import "github.com/owncloud/ocis/v2/ocis-pkg/tracing"
|
||||
|
||||
// Tracing defines the available tracing configuration.
|
||||
type Tracing struct {
|
||||
Enabled bool `yaml:"enabled" env:"OCIS_TRACING_ENABLED;ANTIVIRUS_TRACING_ENABLED" desc:"Activates tracing."`
|
||||
Type string `yaml:"type" env:"OCIS_TRACING_TYPE;ANTIVIRUS_TRACING_TYPE" desc:"The type of tracing. Defaults to '', which is the same as 'jaeger'. Allowed tracing types are 'jaeger' and '' as of now."`
|
||||
Endpoint string `yaml:"endpoint" env:"OCIS_TRACING_ENDPOINT;ANTIVIRUS_TRACING_ENDPOINT" desc:"The endpoint of the tracing agent."`
|
||||
Collector string `yaml:"collector" env:"OCIS_TRACING_COLLECTOR;ANTIVIRUS_TRACING_COLLECTOR" desc:"The HTTP endpoint for sending spans directly to a collector, i.e. http://jaeger-collector:14268/api/traces. Only used if the tracing endpoint is unset."`
|
||||
}
|
||||
|
||||
// Convert Tracing to the tracing package's Config struct.
|
||||
func (t Tracing) Convert() tracing.Config {
|
||||
return tracing.Config{
|
||||
Enabled: t.Enabled,
|
||||
Type: t.Type,
|
||||
Endpoint: t.Endpoint,
|
||||
Collector: t.Collector,
|
||||
}
|
||||
}
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/x509"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
@@ -18,6 +19,14 @@ import (
|
||||
"github.com/owncloud/ocis/v2/ocis-pkg/log"
|
||||
"github.com/owncloud/ocis/v2/services/antivirus/pkg/config"
|
||||
"github.com/owncloud/ocis/v2/services/antivirus/pkg/scanners"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
)
|
||||
|
||||
var (
|
||||
// ErrFatal is returned when a fatal error occurs and we want to exit.
|
||||
ErrFatal = errors.New("fatal error")
|
||||
// ErrEvent is returned when something went wrong with a specific event.
|
||||
ErrEvent = errors.New("event error")
|
||||
)
|
||||
|
||||
// Scanner is an abstraction for the actual virus scan
|
||||
@@ -26,8 +35,8 @@ type Scanner interface {
|
||||
}
|
||||
|
||||
// NewAntivirus returns a service implementation for Service.
|
||||
func NewAntivirus(c *config.Config, l log.Logger) (Antivirus, error) {
|
||||
av := Antivirus{c: c, l: l, client: rhttp.GetHTTPClient(rhttp.Insecure(true))}
|
||||
func NewAntivirus(c *config.Config, l log.Logger, tp trace.TracerProvider) (Antivirus, error) {
|
||||
av := Antivirus{c: c, l: l, tp: tp, client: rhttp.GetHTTPClient(rhttp.Insecure(true))}
|
||||
|
||||
var err error
|
||||
av.s, err = scanners.New(c)
|
||||
@@ -56,18 +65,18 @@ func NewAntivirus(c *config.Config, l log.Logger) (Antivirus, error) {
|
||||
|
||||
// Antivirus defines implements the business logic for Service.
|
||||
type Antivirus struct {
|
||||
c *config.Config
|
||||
l log.Logger
|
||||
s Scanner
|
||||
o events.PostprocessingOutcome
|
||||
m uint64
|
||||
c *config.Config
|
||||
l log.Logger
|
||||
s Scanner
|
||||
o events.PostprocessingOutcome
|
||||
m uint64
|
||||
tp trace.TracerProvider
|
||||
|
||||
client *http.Client
|
||||
}
|
||||
|
||||
// Run runs the service
|
||||
func (av Antivirus) Run() error {
|
||||
ctx := context.Background()
|
||||
evtsCfg := av.c.Events
|
||||
|
||||
var rootCAPool *x509.CertPool
|
||||
@@ -87,81 +96,102 @@ func (av Antivirus) Run() error {
|
||||
evtsCfg.TLSInsecure = false
|
||||
}
|
||||
|
||||
stream, err := stream.NatsFromConfig(av.c.Service.Name, stream.NatsConfig(av.c.Events))
|
||||
natsStream, err := stream.NatsFromConfig(av.c.Service.Name, stream.NatsConfig(av.c.Events))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ch, err := events.Consume(stream, "antivirus", events.StartPostprocessingStep{})
|
||||
ch, err := events.Consume(natsStream, "antivirus", events.StartPostprocessingStep{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for e := range ch {
|
||||
ev := e.Event.(events.StartPostprocessingStep)
|
||||
if ev.StepToStart != events.PPStepAntivirus {
|
||||
continue
|
||||
}
|
||||
|
||||
if av.c.DebugScanOutcome != "" {
|
||||
av.l.Warn().Str("antivir, clamav", ">>>>>>> ANTIVIRUS_DEBUG_SCAN_OUTCOME IS SET NO ACTUAL VIRUS SCAN IS PERFORMED!")
|
||||
if err := events.Publish(ctx, stream, events.PostprocessingStepFinished{
|
||||
FinishedStep: events.PPStepAntivirus,
|
||||
Outcome: events.PostprocessingOutcome(av.c.DebugScanOutcome),
|
||||
UploadID: ev.UploadID,
|
||||
ExecutingUser: ev.ExecutingUser,
|
||||
Filename: ev.Filename,
|
||||
Result: events.VirusscanResult{
|
||||
Infected: true,
|
||||
Description: "DEBUG: forced outcome",
|
||||
Scandate: time.Now(),
|
||||
ResourceID: ev.ResourceID,
|
||||
ErrorMsg: "DEBUG: forced outcome",
|
||||
},
|
||||
}); err != nil {
|
||||
av.l.Fatal().Err(err).Str("uploadid", ev.UploadID).Interface("resourceID", ev.ResourceID).Msg("cannot publish events - exiting")
|
||||
return err
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
av.l.Debug().Str("uploadid", ev.UploadID).Str("filename", ev.Filename).Msg("Starting virus scan.")
|
||||
var errmsg string
|
||||
res, err := av.process(ev)
|
||||
err := av.processEvent(e, natsStream)
|
||||
if err != nil {
|
||||
errmsg = err.Error()
|
||||
switch {
|
||||
case errors.Is(err, ErrFatal):
|
||||
return err
|
||||
case errors.Is(err, ErrEvent):
|
||||
// Right now logging of these happens in the processEvent method, might be cleaner to do it here.
|
||||
continue
|
||||
default:
|
||||
av.l.Fatal().Err(err).Msg("unknown error - exiting")
|
||||
}
|
||||
}
|
||||
|
||||
var outcome events.PostprocessingOutcome
|
||||
switch {
|
||||
case res.Infected:
|
||||
outcome = av.o
|
||||
case !res.Infected && err == nil:
|
||||
outcome = events.PPOutcomeContinue
|
||||
default:
|
||||
outcome = events.PPOutcomeAbort
|
||||
}
|
||||
}
|
||||
|
||||
av.l.Info().Str("uploadid", ev.UploadID).Interface("resourceID", ev.ResourceID).Str("virus", res.Description).Str("outcome", string(outcome)).Str("filename", ev.Filename).Str("user", ev.ExecutingUser.GetId().GetOpaqueId()).Bool("infected", res.Infected).Msg("File scanned")
|
||||
if err := events.Publish(ctx, stream, events.PostprocessingStepFinished{
|
||||
return nil
|
||||
}
|
||||
|
||||
func (av Antivirus) processEvent(e events.Event, s events.Publisher) error {
|
||||
ctx := e.GetTraceContext(context.Background())
|
||||
ctx, span := av.tp.Tracer("antivirus").Start(ctx, "processEvent")
|
||||
defer span.End()
|
||||
av.l.Info().Str("traceID", span.SpanContext().TraceID().String()).Msg("TraceID")
|
||||
ev := e.Event.(events.StartPostprocessingStep)
|
||||
if ev.StepToStart != events.PPStepAntivirus {
|
||||
return nil
|
||||
}
|
||||
|
||||
if av.c.DebugScanOutcome != "" {
|
||||
av.l.Warn().Str("antivir, clamav", ">>>>>>> ANTIVIRUS_DEBUG_SCAN_OUTCOME IS SET NO ACTUAL VIRUS SCAN IS PERFORMED!")
|
||||
if err := events.Publish(ctx, s, events.PostprocessingStepFinished{
|
||||
FinishedStep: events.PPStepAntivirus,
|
||||
Outcome: outcome,
|
||||
Outcome: events.PostprocessingOutcome(av.c.DebugScanOutcome),
|
||||
UploadID: ev.UploadID,
|
||||
ExecutingUser: ev.ExecutingUser,
|
||||
Filename: ev.Filename,
|
||||
Result: events.VirusscanResult{
|
||||
Infected: res.Infected,
|
||||
Description: res.Description,
|
||||
Infected: true,
|
||||
Description: "DEBUG: forced outcome",
|
||||
Scandate: time.Now(),
|
||||
ResourceID: ev.ResourceID,
|
||||
ErrorMsg: errmsg,
|
||||
ErrorMsg: "DEBUG: forced outcome",
|
||||
},
|
||||
}); err != nil {
|
||||
av.l.Fatal().Err(err).Str("uploadid", ev.UploadID).Interface("resourceID", ev.ResourceID).Msg("cannot publish events - exiting")
|
||||
return err
|
||||
return fmt.Errorf("%w: cannot publish events", ErrFatal)
|
||||
}
|
||||
return fmt.Errorf("%w: no actual virus scan performed", ErrEvent)
|
||||
}
|
||||
|
||||
av.l.Debug().Str("uploadid", ev.UploadID).Str("filename", ev.Filename).Msg("Starting virus scan.")
|
||||
var errmsg string
|
||||
res, err := av.process(ev)
|
||||
if err != nil {
|
||||
errmsg = err.Error()
|
||||
}
|
||||
|
||||
var outcome events.PostprocessingOutcome
|
||||
switch {
|
||||
case res.Infected:
|
||||
outcome = av.o
|
||||
case !res.Infected && err == nil:
|
||||
outcome = events.PPOutcomeContinue
|
||||
default:
|
||||
outcome = events.PPOutcomeAbort
|
||||
}
|
||||
|
||||
av.l.Info().Str("uploadid", ev.UploadID).Interface("resourceID", ev.ResourceID).Str("virus", res.Description).Str("outcome", string(outcome)).Str("filename", ev.Filename).Str("user", ev.ExecutingUser.GetId().GetOpaqueId()).Bool("infected", res.Infected).Msg("File scanned")
|
||||
if err := events.Publish(ctx, s, events.PostprocessingStepFinished{
|
||||
FinishedStep: events.PPStepAntivirus,
|
||||
Outcome: outcome,
|
||||
UploadID: ev.UploadID,
|
||||
ExecutingUser: ev.ExecutingUser,
|
||||
Filename: ev.Filename,
|
||||
Result: events.VirusscanResult{
|
||||
Infected: res.Infected,
|
||||
Description: res.Description,
|
||||
Scandate: time.Now(),
|
||||
ResourceID: ev.ResourceID,
|
||||
ErrorMsg: errmsg,
|
||||
},
|
||||
}); err != nil {
|
||||
av.l.Fatal().Err(err).Str("uploadid", ev.UploadID).Interface("resourceID", ev.ResourceID).Msg("cannot publish events - exiting")
|
||||
return fmt.Errorf("%w: %s", ErrFatal, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -55,6 +55,8 @@ func (pps *PostprocessingService) Run() error {
|
||||
err error
|
||||
)
|
||||
|
||||
ctx = e.GetTraceContext(ctx)
|
||||
|
||||
switch ev := e.Event.(type) {
|
||||
case events.BytesReceived:
|
||||
pp = postprocessing.New(ev.UploadID, ev.URL, ev.ExecutingUser, ev.Filename, ev.Filesize, ev.ResourceID, pps.steps, pps.c.Delayprocessing)
|
||||
|
||||
Reference in New Issue
Block a user