diff --git a/changelog/unreleased/antivirus-workers.md b/changelog/unreleased/antivirus-workers.md new file mode 100644 index 0000000000..6546284275 --- /dev/null +++ b/changelog/unreleased/antivirus-workers.md @@ -0,0 +1,5 @@ +Bugfix: make antivirus workers configurable + +We made the number of go routines that pull events from the queue configurable. + +https://github.com/owncloud/ocis/pull/10383 diff --git a/services/antivirus/README.md b/services/antivirus/README.md index 09a3352298..caf7e99004 100644 --- a/services/antivirus/README.md +++ b/services/antivirus/README.md @@ -15,6 +15,9 @@ The antivirus service currently supports [ICAP](https://tools.ietf.org/html/rfc3 Several factors can make it necessary to limit the maximum filesize the antivirus service will use for scanning. Use the `ANTIVIRUS_MAX_SCAN_SIZE` environment variable to scan only a given amount of bytes. Obviously, it is recommended to scan the whole file, but several factors like scanner type and version, bandwidth, performance issues, etc. might make a limit necessary. +> [!CAUTION] +> Streaming of files to the virus scan service still [needs to be implemented](https://github.com/owncloud/ocis/issues/6803). To prevent OOM errors `ANTIVIRUS_MAX_SCAN_SIZE` needs to be set lower than available ram. + ### Infected File Handling The antivirus service allows three different ways of handling infected files. Those can be set via the `ANTIVIRUS_INFECTED_FILE_HANDLING` environment variable: @@ -36,3 +39,9 @@ The antivirus service can scan files during `postprocessing`. `on demand` scanni ### Postprocessing The antivirus service will scan files during postprocessing. It listens for a postprocessing step called `virusscan`. This step can be added in the environment variable `POSTPROCESSING_STEPS`. Read the documentation of the [postprocessing service](https://github.com/owncloud/ocis/tree/master/services/postprocessing) for more details. + +The number of concurrent scans can be increased by setting `ANTIVIRUS_WORKERS`, but be aware that this will also increase the memory usage. + +### Scaling in Kubernetes + +In kubernetes `ANTIVIRUS_WORKERS` and `ANTIVIRUS_MAX_SCAN_SIZE` can be used to trigger the horizontal pod autoscaler by requesting a memory size that is below `ANTIVIRUS_MAX_SCAN_SIZE`. Keep in mind that `ANTIVIRUS_MAX_SCAN_SIZE` amount of memory might be held by `ANTIVIRUS_WORKERS` number of go routines. diff --git a/services/antivirus/pkg/config/config.go b/services/antivirus/pkg/config/config.go index fd71d3fd24..3d58dcc053 100644 --- a/services/antivirus/pkg/config/config.go +++ b/services/antivirus/pkg/config/config.go @@ -10,7 +10,7 @@ type Config struct { File string Log *Log - Debug Debug `mask:"struct" yaml:"debug"` + Debug Debug `yaml:"debug" mask:"struct"` Service Service `yaml:"-"` @@ -18,10 +18,12 @@ type Config struct { InfectedFileHandling string `yaml:"infected-file-handling" env:"ANTIVIRUS_INFECTED_FILE_HANDLING" desc:"Defines the behaviour when a virus has been found. Supported options are: 'delete', 'continue' and 'abort '. Delete will delete the file. Continue will mark the file as infected but continues further processing. Abort will keep the file in the uploads folder for further admin inspection and will not move it to its final destination." introductionVersion:"pre5.0"` Events Events - Scanner Scanner - MaxScanSize string `yaml:"max-scan-size" env:"ANTIVIRUS_MAX_SCAN_SIZE" desc:"The maximum scan size the virus scanner can handle. Only this many bytes of a file will be scanned. 0 means unlimited and is the default. Usable common abbreviations: [KB, KiB, MB, MiB, GB, GiB, TB, TiB, PB, PiB, EB, EiB], example: 2GB." introductionVersion:"pre5.0"` + Workers int `yaml:"workers" env:"ANTIVIRUS_WORKERS" desc:"The number of concurrent go routines that fetch events from the event queue." introductionVersion:"%%NEXT%%"` - Context context.Context `yaml:"-" json:"-"` + Scanner Scanner + MaxScanSize string `yaml:"max-scan-size" env:"ANTIVIRUS_MAX_SCAN_SIZE" desc:"The maximum scan size the virus scanner can handle. Only this many bytes of a file will be scanned. 0 means unlimited and is the default. Usable common abbreviations: [KB, KiB, MB, MiB, GB, GiB, TB, TiB, PB, PiB, EB, EiB], example: 2GB." introductionVersion:"pre5.0"` + + Context context.Context `json:"-" yaml:"-"` DebugScanOutcome string `yaml:"-" env:"ANTIVIRUS_DEBUG_SCAN_OUTCOME" desc:"A predefined outcome for virus scanning, FOR DEBUG PURPOSES ONLY! (example values: 'found,infected')" introductionVersion:"pre5.0"` } diff --git a/services/antivirus/pkg/config/defaults/defaultconfig.go b/services/antivirus/pkg/config/defaults/defaultconfig.go index 9d6291033f..24be86e502 100644 --- a/services/antivirus/pkg/config/defaults/defaultconfig.go +++ b/services/antivirus/pkg/config/defaults/defaultconfig.go @@ -28,6 +28,7 @@ func DefaultConfig() *config.Config { Endpoint: "127.0.0.1:9233", Cluster: "ocis-cluster", }, + Workers: 10, InfectedFileHandling: "delete", Scanner: config.Scanner{ Type: "clamav", diff --git a/services/antivirus/pkg/scanners/icap.go b/services/antivirus/pkg/scanners/icap.go index a4fc4e33b8..1f3b206996 100644 --- a/services/antivirus/pkg/scanners/icap.go +++ b/services/antivirus/pkg/scanners/icap.go @@ -86,6 +86,7 @@ func (s ICAP) Scan(in Input) (Result, error) { if err != nil { return result, err } + result.ScanTime = time.Now() // TODO: make header configurable. See oc10 documentation: https://doc.owncloud.com/server/10.12/admin_manual/configuration/server/virus-scanner-support.html if data, infected := res.Header["X-Infection-Found"]; infected { diff --git a/services/antivirus/pkg/service/service.go b/services/antivirus/pkg/service/service.go index a1d41c651c..099228dd0a 100644 --- a/services/antivirus/pkg/service/service.go +++ b/services/antivirus/pkg/service/service.go @@ -9,6 +9,7 @@ import ( "io" "net/http" "os" + "sync" "time" "github.com/cs3org/reva/v2/pkg/bytesize" @@ -90,7 +91,7 @@ func (av Antivirus) Run() error { evtsCfg := av.c.Events var rootCAPool *x509.CertPool - if evtsCfg.TLSRootCACertificate != "" { + if av.c.Events.TLSRootCACertificate != "" { rootCrtFile, err := os.Open(evtsCfg.TLSRootCACertificate) if err != nil { return err @@ -103,7 +104,7 @@ func (av Antivirus) Run() error { rootCAPool = x509.NewCertPool() rootCAPool.AppendCertsFromPEM(certBytes.Bytes()) - evtsCfg.TLSInsecure = false + av.c.Events.TLSInsecure = false } natsStream, err := stream.NatsFromConfig(av.c.Service.Name, false, stream.NatsConfig(av.c.Events)) @@ -116,21 +117,27 @@ func (av Antivirus) Run() error { return err } - for e := range ch { - err := av.processEvent(e, natsStream) - if err != nil { - switch { - case errors.Is(err, ErrFatal): - return err - case errors.Is(err, ErrEvent): - // Right now logging of these happens in the processEvent method, might be cleaner to do it here. - continue - default: - av.l.Fatal().Err(err).Msg("unknown error - exiting") + wg := sync.WaitGroup{} + for i := 0; i < av.c.Workers; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for e := range ch { + err := av.processEvent(e, natsStream) + if err != nil { + switch { + case errors.Is(err, ErrFatal): + av.l.Fatal().Err(err).Msg("fatal error - exiting") + case errors.Is(err, ErrEvent): + av.l.Error().Err(err).Msg("continuing") + default: + av.l.Fatal().Err(err).Msg("unknown error - exiting") + } + } } - } - + }() } + wg.Wait() return nil } @@ -168,10 +175,12 @@ func (av Antivirus) processEvent(e events.Event, s events.Publisher) error { av.l.Debug().Str("uploadid", ev.UploadID).Str("filename", ev.Filename).Msg("Starting virus scan.") var errmsg string + start := time.Now() res, err := av.process(ev) if err != nil { errmsg = err.Error() } + duration := time.Since(start) var outcome events.PostprocessingOutcome switch { @@ -186,7 +195,7 @@ func (av Antivirus) processEvent(e events.Event, s events.Publisher) error { outcome = events.PPOutcomeAbort } - av.l.Info().Str("uploadid", ev.UploadID).Interface("resourceID", ev.ResourceID).Str("virus", res.Description).Str("outcome", string(outcome)).Str("filename", ev.Filename).Str("user", ev.ExecutingUser.GetId().GetOpaqueId()).Bool("infected", res.Infected).Msg("File scanned") + av.l.Info().Str("uploadid", ev.UploadID).Interface("resourceID", ev.ResourceID).Str("virus", res.Description).Str("outcome", string(outcome)).Str("filename", ev.Filename).Str("user", ev.ExecutingUser.GetId().GetOpaqueId()).Bool("infected", res.Infected).Dur("duration", duration).Msg("File scanned") if err := events.Publish(ctx, s, events.PostprocessingStepFinished{ FinishedStep: events.PPStepAntivirus, Outcome: outcome,