Merge pull request #10383 from owncloud/antivirus-workers

Antivirus workers
This commit is contained in:
Roman Perekhod
2024-10-23 10:38:06 +02:00
committed by GitHub
6 changed files with 47 additions and 20 deletions

View File

@@ -0,0 +1,5 @@
Bugfix: make antivirus workers configurable
We made the number of go routines that pull events from the queue configurable.
https://github.com/owncloud/ocis/pull/10383

View File

@@ -15,6 +15,9 @@ The antivirus service currently supports [ICAP](https://tools.ietf.org/html/rfc3
Several factors can make it necessary to limit the maximum filesize the antivirus service will use for scanning. Use the `ANTIVIRUS_MAX_SCAN_SIZE` environment variable to scan only a given amount of bytes. Obviously, it is recommended to scan the whole file, but several factors like scanner type and version, bandwidth, performance issues, etc. might make a limit necessary.
> [!CAUTION]
> Streaming of files to the virus scan service still [needs to be implemented](https://github.com/owncloud/ocis/issues/6803). To prevent OOM errors `ANTIVIRUS_MAX_SCAN_SIZE` needs to be set lower than available ram.
### Infected File Handling
The antivirus service allows three different ways of handling infected files. Those can be set via the `ANTIVIRUS_INFECTED_FILE_HANDLING` environment variable:
@@ -36,3 +39,9 @@ The antivirus service can scan files during `postprocessing`. `on demand` scanni
### Postprocessing
The antivirus service will scan files during postprocessing. It listens for a postprocessing step called `virusscan`. This step can be added in the environment variable `POSTPROCESSING_STEPS`. Read the documentation of the [postprocessing service](https://github.com/owncloud/ocis/tree/master/services/postprocessing) for more details.
The number of concurrent scans can be increased by setting `ANTIVIRUS_WORKERS`, but be aware that this will also increase the memory usage.
### Scaling in Kubernetes
In kubernetes `ANTIVIRUS_WORKERS` and `ANTIVIRUS_MAX_SCAN_SIZE` can be used to trigger the horizontal pod autoscaler by requesting a memory size that is below `ANTIVIRUS_MAX_SCAN_SIZE`. Keep in mind that `ANTIVIRUS_MAX_SCAN_SIZE` amount of memory might be held by `ANTIVIRUS_WORKERS` number of go routines.

View File

@@ -10,7 +10,7 @@ type Config struct {
File string
Log *Log
Debug Debug `mask:"struct" yaml:"debug"`
Debug Debug `yaml:"debug" mask:"struct"`
Service Service `yaml:"-"`
@@ -18,10 +18,12 @@ type Config struct {
InfectedFileHandling string `yaml:"infected-file-handling" env:"ANTIVIRUS_INFECTED_FILE_HANDLING" desc:"Defines the behaviour when a virus has been found. Supported options are: 'delete', 'continue' and 'abort '. Delete will delete the file. Continue will mark the file as infected but continues further processing. Abort will keep the file in the uploads folder for further admin inspection and will not move it to its final destination." introductionVersion:"pre5.0"`
Events Events
Scanner Scanner
MaxScanSize string `yaml:"max-scan-size" env:"ANTIVIRUS_MAX_SCAN_SIZE" desc:"The maximum scan size the virus scanner can handle. Only this many bytes of a file will be scanned. 0 means unlimited and is the default. Usable common abbreviations: [KB, KiB, MB, MiB, GB, GiB, TB, TiB, PB, PiB, EB, EiB], example: 2GB." introductionVersion:"pre5.0"`
Workers int `yaml:"workers" env:"ANTIVIRUS_WORKERS" desc:"The number of concurrent go routines that fetch events from the event queue." introductionVersion:"%%NEXT%%"`
Context context.Context `yaml:"-" json:"-"`
Scanner Scanner
MaxScanSize string `yaml:"max-scan-size" env:"ANTIVIRUS_MAX_SCAN_SIZE" desc:"The maximum scan size the virus scanner can handle. Only this many bytes of a file will be scanned. 0 means unlimited and is the default. Usable common abbreviations: [KB, KiB, MB, MiB, GB, GiB, TB, TiB, PB, PiB, EB, EiB], example: 2GB." introductionVersion:"pre5.0"`
Context context.Context `json:"-" yaml:"-"`
DebugScanOutcome string `yaml:"-" env:"ANTIVIRUS_DEBUG_SCAN_OUTCOME" desc:"A predefined outcome for virus scanning, FOR DEBUG PURPOSES ONLY! (example values: 'found,infected')" introductionVersion:"pre5.0"`
}

View File

@@ -28,6 +28,7 @@ func DefaultConfig() *config.Config {
Endpoint: "127.0.0.1:9233",
Cluster: "ocis-cluster",
},
Workers: 10,
InfectedFileHandling: "delete",
Scanner: config.Scanner{
Type: "clamav",

View File

@@ -86,6 +86,7 @@ func (s ICAP) Scan(in Input) (Result, error) {
if err != nil {
return result, err
}
result.ScanTime = time.Now()
// TODO: make header configurable. See oc10 documentation: https://doc.owncloud.com/server/10.12/admin_manual/configuration/server/virus-scanner-support.html
if data, infected := res.Header["X-Infection-Found"]; infected {

View File

@@ -9,6 +9,7 @@ import (
"io"
"net/http"
"os"
"sync"
"time"
"github.com/cs3org/reva/v2/pkg/bytesize"
@@ -90,7 +91,7 @@ func (av Antivirus) Run() error {
evtsCfg := av.c.Events
var rootCAPool *x509.CertPool
if evtsCfg.TLSRootCACertificate != "" {
if av.c.Events.TLSRootCACertificate != "" {
rootCrtFile, err := os.Open(evtsCfg.TLSRootCACertificate)
if err != nil {
return err
@@ -103,7 +104,7 @@ func (av Antivirus) Run() error {
rootCAPool = x509.NewCertPool()
rootCAPool.AppendCertsFromPEM(certBytes.Bytes())
evtsCfg.TLSInsecure = false
av.c.Events.TLSInsecure = false
}
natsStream, err := stream.NatsFromConfig(av.c.Service.Name, false, stream.NatsConfig(av.c.Events))
@@ -116,21 +117,27 @@ func (av Antivirus) Run() error {
return err
}
for e := range ch {
err := av.processEvent(e, natsStream)
if err != nil {
switch {
case errors.Is(err, ErrFatal):
return err
case errors.Is(err, ErrEvent):
// Right now logging of these happens in the processEvent method, might be cleaner to do it here.
continue
default:
av.l.Fatal().Err(err).Msg("unknown error - exiting")
wg := sync.WaitGroup{}
for i := 0; i < av.c.Workers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for e := range ch {
err := av.processEvent(e, natsStream)
if err != nil {
switch {
case errors.Is(err, ErrFatal):
av.l.Fatal().Err(err).Msg("fatal error - exiting")
case errors.Is(err, ErrEvent):
av.l.Error().Err(err).Msg("continuing")
default:
av.l.Fatal().Err(err).Msg("unknown error - exiting")
}
}
}
}
}()
}
wg.Wait()
return nil
}
@@ -168,10 +175,12 @@ func (av Antivirus) processEvent(e events.Event, s events.Publisher) error {
av.l.Debug().Str("uploadid", ev.UploadID).Str("filename", ev.Filename).Msg("Starting virus scan.")
var errmsg string
start := time.Now()
res, err := av.process(ev)
if err != nil {
errmsg = err.Error()
}
duration := time.Since(start)
var outcome events.PostprocessingOutcome
switch {
@@ -186,7 +195,7 @@ func (av Antivirus) processEvent(e events.Event, s events.Publisher) error {
outcome = events.PPOutcomeAbort
}
av.l.Info().Str("uploadid", ev.UploadID).Interface("resourceID", ev.ResourceID).Str("virus", res.Description).Str("outcome", string(outcome)).Str("filename", ev.Filename).Str("user", ev.ExecutingUser.GetId().GetOpaqueId()).Bool("infected", res.Infected).Msg("File scanned")
av.l.Info().Str("uploadid", ev.UploadID).Interface("resourceID", ev.ResourceID).Str("virus", res.Description).Str("outcome", string(outcome)).Str("filename", ev.Filename).Str("user", ev.ExecutingUser.GetId().GetOpaqueId()).Bool("infected", res.Infected).Dur("duration", duration).Msg("File scanned")
if err := events.Publish(ctx, s, events.PostprocessingStepFinished{
FinishedStep: events.PPStepAntivirus,
Outcome: outcome,