From 61e6f1edbaab65ca741cfe584f130984036ea0c1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn=20Friedrich=20Dreyer?= Date: Tue, 22 Oct 2024 14:15:27 +0200 Subject: [PATCH 1/4] make antivirus workers configurable MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jörn Friedrich Dreyer --- services/antivirus/pkg/config/config.go | 6 +++-- services/antivirus/pkg/scanners/icap.go | 1 + services/antivirus/pkg/service/service.go | 33 +++++++++++++---------- 3 files changed, 24 insertions(+), 16 deletions(-) diff --git a/services/antivirus/pkg/config/config.go b/services/antivirus/pkg/config/config.go index fd71d3fd24..7aee2920a1 100644 --- a/services/antivirus/pkg/config/config.go +++ b/services/antivirus/pkg/config/config.go @@ -18,8 +18,10 @@ type Config struct { InfectedFileHandling string `yaml:"infected-file-handling" env:"ANTIVIRUS_INFECTED_FILE_HANDLING" desc:"Defines the behaviour when a virus has been found. Supported options are: 'delete', 'continue' and 'abort '. Delete will delete the file. Continue will mark the file as infected but continues further processing. Abort will keep the file in the uploads folder for further admin inspection and will not move it to its final destination." introductionVersion:"pre5.0"` Events Events - Scanner Scanner - MaxScanSize string `yaml:"max-scan-size" env:"ANTIVIRUS_MAX_SCAN_SIZE" desc:"The maximum scan size the virus scanner can handle. Only this many bytes of a file will be scanned. 0 means unlimited and is the default. Usable common abbreviations: [KB, KiB, MB, MiB, GB, GiB, TB, TiB, PB, PiB, EB, EiB], example: 2GB." introductionVersion:"pre5.0"` + Workers int `yaml:"workers" env:"ANTIVIRUS_WORKERS" desc:"The number of concurrent go routines that fetch events from the event queue." introductionVersion:"6.7"` + + Scanner Scanner + MaxScanSize string `yaml:"max-scan-size" env:"ANTIVIRUS_MAX_SCAN_SIZE" desc:"The maximum scan size the virus scanner can handle. Only this many bytes of a file will be scanned. 0 means unlimited and is the default. Usable common abbreviations: [KB, KiB, MB, MiB, GB, GiB, TB, TiB, PB, PiB, EB, EiB], example: 2GB." introductionVersion:"pre5.0"` Context context.Context `yaml:"-" json:"-"` diff --git a/services/antivirus/pkg/scanners/icap.go b/services/antivirus/pkg/scanners/icap.go index a4fc4e33b8..1f3b206996 100644 --- a/services/antivirus/pkg/scanners/icap.go +++ b/services/antivirus/pkg/scanners/icap.go @@ -86,6 +86,7 @@ func (s ICAP) Scan(in Input) (Result, error) { if err != nil { return result, err } + result.ScanTime = time.Now() // TODO: make header configurable. See oc10 documentation: https://doc.owncloud.com/server/10.12/admin_manual/configuration/server/virus-scanner-support.html if data, infected := res.Header["X-Infection-Found"]; infected { diff --git a/services/antivirus/pkg/service/service.go b/services/antivirus/pkg/service/service.go index a1d41c651c..52853a1b6f 100644 --- a/services/antivirus/pkg/service/service.go +++ b/services/antivirus/pkg/service/service.go @@ -116,20 +116,23 @@ func (av Antivirus) Run() error { return err } - for e := range ch { - err := av.processEvent(e, natsStream) - if err != nil { - switch { - case errors.Is(err, ErrFatal): - return err - case errors.Is(err, ErrEvent): - // Right now logging of these happens in the processEvent method, might be cleaner to do it here. - continue - default: - av.l.Fatal().Err(err).Msg("unknown error - exiting") + // Spawn workers that'll concurrently work the queue + for i := 0; i < av.c.Workers; i++ { + go (func() { + for e := range ch { + err := av.processEvent(e, natsStream) + if err != nil { + switch { + case errors.Is(err, ErrFatal): + av.l.Fatal().Err(err).Msg("fatal error - exiting") + case errors.Is(err, ErrEvent): + av.l.Error().Err(err).Msg("continuing") + default: + av.l.Fatal().Err(err).Msg("unknown error - exiting") + } + } } - } - + })() } return nil @@ -168,10 +171,12 @@ func (av Antivirus) processEvent(e events.Event, s events.Publisher) error { av.l.Debug().Str("uploadid", ev.UploadID).Str("filename", ev.Filename).Msg("Starting virus scan.") var errmsg string + start := time.Now() res, err := av.process(ev) if err != nil { errmsg = err.Error() } + duration := time.Since(start) var outcome events.PostprocessingOutcome switch { @@ -186,7 +191,7 @@ func (av Antivirus) processEvent(e events.Event, s events.Publisher) error { outcome = events.PPOutcomeAbort } - av.l.Info().Str("uploadid", ev.UploadID).Interface("resourceID", ev.ResourceID).Str("virus", res.Description).Str("outcome", string(outcome)).Str("filename", ev.Filename).Str("user", ev.ExecutingUser.GetId().GetOpaqueId()).Bool("infected", res.Infected).Msg("File scanned") + av.l.Info().Str("uploadid", ev.UploadID).Interface("resourceID", ev.ResourceID).Str("virus", res.Description).Str("outcome", string(outcome)).Str("filename", ev.Filename).Str("user", ev.ExecutingUser.GetId().GetOpaqueId()).Bool("infected", res.Infected).Dur("duration", duration).Msg("File scanned") if err := events.Publish(ctx, s, events.PostprocessingStepFinished{ FinishedStep: events.PPStepAntivirus, Outcome: outcome, From 775f75f7fc7f99b971b96ad4d9560c5c5370d78e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn=20Friedrich=20Dreyer?= Date: Tue, 22 Oct 2024 15:15:53 +0200 Subject: [PATCH 2/4] configurable antivirus workers MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jörn Friedrich Dreyer --- changelog/unreleased/antivirus-workers.md | 5 +++++ services/antivirus/README.md | 9 +++++++++ .../antivirus/pkg/config/defaults/defaultconfig.go | 1 + services/antivirus/pkg/service/service.go | 10 +++++++--- 4 files changed, 22 insertions(+), 3 deletions(-) create mode 100644 changelog/unreleased/antivirus-workers.md diff --git a/changelog/unreleased/antivirus-workers.md b/changelog/unreleased/antivirus-workers.md new file mode 100644 index 0000000000..6546284275 --- /dev/null +++ b/changelog/unreleased/antivirus-workers.md @@ -0,0 +1,5 @@ +Bugfix: make antivirus workers configurable + +We made the number of go routines that pull events from the queue configurable. + +https://github.com/owncloud/ocis/pull/10383 diff --git a/services/antivirus/README.md b/services/antivirus/README.md index 09a3352298..caf7e99004 100644 --- a/services/antivirus/README.md +++ b/services/antivirus/README.md @@ -15,6 +15,9 @@ The antivirus service currently supports [ICAP](https://tools.ietf.org/html/rfc3 Several factors can make it necessary to limit the maximum filesize the antivirus service will use for scanning. Use the `ANTIVIRUS_MAX_SCAN_SIZE` environment variable to scan only a given amount of bytes. Obviously, it is recommended to scan the whole file, but several factors like scanner type and version, bandwidth, performance issues, etc. might make a limit necessary. +> [!CAUTION] +> Streaming of files to the virus scan service still [needs to be implemented](https://github.com/owncloud/ocis/issues/6803). To prevent OOM errors `ANTIVIRUS_MAX_SCAN_SIZE` needs to be set lower than available ram. + ### Infected File Handling The antivirus service allows three different ways of handling infected files. Those can be set via the `ANTIVIRUS_INFECTED_FILE_HANDLING` environment variable: @@ -36,3 +39,9 @@ The antivirus service can scan files during `postprocessing`. `on demand` scanni ### Postprocessing The antivirus service will scan files during postprocessing. It listens for a postprocessing step called `virusscan`. This step can be added in the environment variable `POSTPROCESSING_STEPS`. Read the documentation of the [postprocessing service](https://github.com/owncloud/ocis/tree/master/services/postprocessing) for more details. + +The number of concurrent scans can be increased by setting `ANTIVIRUS_WORKERS`, but be aware that this will also increase the memory usage. + +### Scaling in Kubernetes + +In kubernetes `ANTIVIRUS_WORKERS` and `ANTIVIRUS_MAX_SCAN_SIZE` can be used to trigger the horizontal pod autoscaler by requesting a memory size that is below `ANTIVIRUS_MAX_SCAN_SIZE`. Keep in mind that `ANTIVIRUS_MAX_SCAN_SIZE` amount of memory might be held by `ANTIVIRUS_WORKERS` number of go routines. diff --git a/services/antivirus/pkg/config/defaults/defaultconfig.go b/services/antivirus/pkg/config/defaults/defaultconfig.go index 9d6291033f..24be86e502 100644 --- a/services/antivirus/pkg/config/defaults/defaultconfig.go +++ b/services/antivirus/pkg/config/defaults/defaultconfig.go @@ -28,6 +28,7 @@ func DefaultConfig() *config.Config { Endpoint: "127.0.0.1:9233", Cluster: "ocis-cluster", }, + Workers: 10, InfectedFileHandling: "delete", Scanner: config.Scanner{ Type: "clamav", diff --git a/services/antivirus/pkg/service/service.go b/services/antivirus/pkg/service/service.go index 52853a1b6f..103f680379 100644 --- a/services/antivirus/pkg/service/service.go +++ b/services/antivirus/pkg/service/service.go @@ -9,6 +9,7 @@ import ( "io" "net/http" "os" + "sync" "time" "github.com/cs3org/reva/v2/pkg/bytesize" @@ -116,9 +117,11 @@ func (av Antivirus) Run() error { return err } - // Spawn workers that'll concurrently work the queue + wg := sync.WaitGroup{} for i := 0; i < av.c.Workers; i++ { - go (func() { + wg.Add(1) + go func() { + defer wg.Done() for e := range ch { err := av.processEvent(e, natsStream) if err != nil { @@ -132,8 +135,9 @@ func (av Antivirus) Run() error { } } } - })() + }() } + wg.Wait() return nil } From cb6e3c37fc34d19dfb72fb916bc7d12f5c7f9527 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn=20Friedrich=20Dreyer?= Date: Tue, 22 Oct 2024 17:18:11 +0200 Subject: [PATCH 3/4] fix sonarcloud bugs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jörn Friedrich Dreyer --- services/antivirus/pkg/config/config.go | 4 ++-- services/antivirus/pkg/service/service.go | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/services/antivirus/pkg/config/config.go b/services/antivirus/pkg/config/config.go index 7aee2920a1..84a6f1d320 100644 --- a/services/antivirus/pkg/config/config.go +++ b/services/antivirus/pkg/config/config.go @@ -10,7 +10,7 @@ type Config struct { File string Log *Log - Debug Debug `mask:"struct" yaml:"debug"` + Debug Debug `yaml:"debug" mask:"struct"` Service Service `yaml:"-"` @@ -23,7 +23,7 @@ type Config struct { Scanner Scanner MaxScanSize string `yaml:"max-scan-size" env:"ANTIVIRUS_MAX_SCAN_SIZE" desc:"The maximum scan size the virus scanner can handle. Only this many bytes of a file will be scanned. 0 means unlimited and is the default. Usable common abbreviations: [KB, KiB, MB, MiB, GB, GiB, TB, TiB, PB, PiB, EB, EiB], example: 2GB." introductionVersion:"pre5.0"` - Context context.Context `yaml:"-" json:"-"` + Context context.Context `json:"-" yaml:"-"` DebugScanOutcome string `yaml:"-" env:"ANTIVIRUS_DEBUG_SCAN_OUTCOME" desc:"A predefined outcome for virus scanning, FOR DEBUG PURPOSES ONLY! (example values: 'found,infected')" introductionVersion:"pre5.0"` } diff --git a/services/antivirus/pkg/service/service.go b/services/antivirus/pkg/service/service.go index 103f680379..099228dd0a 100644 --- a/services/antivirus/pkg/service/service.go +++ b/services/antivirus/pkg/service/service.go @@ -91,7 +91,7 @@ func (av Antivirus) Run() error { evtsCfg := av.c.Events var rootCAPool *x509.CertPool - if evtsCfg.TLSRootCACertificate != "" { + if av.c.Events.TLSRootCACertificate != "" { rootCrtFile, err := os.Open(evtsCfg.TLSRootCACertificate) if err != nil { return err @@ -104,7 +104,7 @@ func (av Antivirus) Run() error { rootCAPool = x509.NewCertPool() rootCAPool.AppendCertsFromPEM(certBytes.Bytes()) - evtsCfg.TLSInsecure = false + av.c.Events.TLSInsecure = false } natsStream, err := stream.NatsFromConfig(av.c.Service.Name, false, stream.NatsConfig(av.c.Events)) From fb732c5b2aff5b1236bf0072bdb9454c5e002f50 Mon Sep 17 00:00:00 2001 From: Roman Perekhod Date: Wed, 23 Oct 2024 10:08:20 +0200 Subject: [PATCH 4/4] fix config --- services/antivirus/pkg/config/config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/antivirus/pkg/config/config.go b/services/antivirus/pkg/config/config.go index 84a6f1d320..3d58dcc053 100644 --- a/services/antivirus/pkg/config/config.go +++ b/services/antivirus/pkg/config/config.go @@ -18,7 +18,7 @@ type Config struct { InfectedFileHandling string `yaml:"infected-file-handling" env:"ANTIVIRUS_INFECTED_FILE_HANDLING" desc:"Defines the behaviour when a virus has been found. Supported options are: 'delete', 'continue' and 'abort '. Delete will delete the file. Continue will mark the file as infected but continues further processing. Abort will keep the file in the uploads folder for further admin inspection and will not move it to its final destination." introductionVersion:"pre5.0"` Events Events - Workers int `yaml:"workers" env:"ANTIVIRUS_WORKERS" desc:"The number of concurrent go routines that fetch events from the event queue." introductionVersion:"6.7"` + Workers int `yaml:"workers" env:"ANTIVIRUS_WORKERS" desc:"The number of concurrent go routines that fetch events from the event queue." introductionVersion:"%%NEXT%%"` Scanner Scanner MaxScanSize string `yaml:"max-scan-size" env:"ANTIVIRUS_MAX_SCAN_SIZE" desc:"The maximum scan size the virus scanner can handle. Only this many bytes of a file will be scanned. 0 means unlimited and is the default. Usable common abbreviations: [KB, KiB, MB, MiB, GB, GiB, TB, TiB, PB, PiB, EB, EiB], example: 2GB." introductionVersion:"pre5.0"`