diff --git a/services/postprocessing/Makefile b/services/postprocessing/Makefile new file mode 100644 index 000000000..0bdc11572 --- /dev/null +++ b/services/postprocessing/Makefile @@ -0,0 +1,37 @@ +SHELL := bash +NAME := postprocessing + +include ../../.make/recursion.mk + +############ tooling ############ +ifneq (, $(shell command -v go 2> /dev/null)) # suppress `command not found warnings` for non go targets in CI +include ../../.bingo/Variables.mk +endif + +############ go tooling ############ +include ../../.make/go.mk + +############ release ############ +include ../../.make/release.mk + +############ docs generate ############ +include ../../.make/docs.mk + +.PHONY: docs-generate +docs-generate: config-docs-generate + +############ generate ############ +include ../../.make/generate.mk + +.PHONY: ci-go-generate +ci-go-generate: # CI runs ci-node-generate automatically before this target + +.PHONY: ci-node-generate +ci-node-generate: + +############ licenses ############ +.PHONY: ci-node-check-licenses +ci-node-check-licenses: + +.PHONY: ci-node-save-licenses +ci-node-save-licenses: diff --git a/services/postprocessing/cmd/postprocessing/blubb.txt b/services/postprocessing/cmd/postprocessing/blubb.txt new file mode 100644 index 000000000..2fb06510d Binary files /dev/null and b/services/postprocessing/cmd/postprocessing/blubb.txt differ diff --git a/services/postprocessing/cmd/postprocessing/main.go b/services/postprocessing/cmd/postprocessing/main.go new file mode 100644 index 000000000..ddc191e3a --- /dev/null +++ b/services/postprocessing/cmd/postprocessing/main.go @@ -0,0 +1,14 @@ +package main + +import ( + "os" + + "github.com/owncloud/ocis/v2/services/postprocessing/pkg/command" + "github.com/owncloud/ocis/v2/services/postprocessing/pkg/config/defaults" +) + +func main() { + if err := command.Execute(defaults.DefaultConfig()); err != nil { + os.Exit(1) + } +} diff --git a/services/postprocessing/pkg/command/health.go b/services/postprocessing/pkg/command/health.go new file mode 100644 index 000000000..0a9fe13bd --- /dev/null +++ b/services/postprocessing/pkg/command/health.go @@ -0,0 +1,18 @@ +package command + +import ( + "github.com/owncloud/ocis/v2/services/postprocessing/pkg/config" + "github.com/urfave/cli/v2" +) + +// Health is the entrypoint for the health command. +func Health(cfg *config.Config) *cli.Command { + return &cli.Command{ + Name: "health", + Usage: "Check health status", + Action: func(c *cli.Context) error { + // Not implemented + return nil + }, + } +} diff --git a/services/postprocessing/pkg/command/root.go b/services/postprocessing/pkg/command/root.go new file mode 100644 index 000000000..0b558d9b7 --- /dev/null +++ b/services/postprocessing/pkg/command/root.go @@ -0,0 +1,56 @@ +package command + +import ( + "context" + "os" + + "github.com/owncloud/ocis/v2/ocis-pkg/clihelper" + ociscfg "github.com/owncloud/ocis/v2/ocis-pkg/config" + "github.com/owncloud/ocis/v2/services/postprocessing/pkg/config" + "github.com/thejerf/suture/v4" + "github.com/urfave/cli/v2" +) + +// GetCommands provides all commands for this service +func GetCommands(cfg *config.Config) cli.Commands { + return []*cli.Command{ + // start this service + Server(cfg), + + // interaction with this service + + // infos about this service + Health(cfg), + Version(cfg), + } +} + +// Execute is the entry point for the postprocessing command. +func Execute(cfg *config.Config) error { + app := clihelper.DefaultApp(&cli.App{ + Name: "postprocessing", + Usage: "starts postprocessing service", + Commands: GetCommands(cfg), + }) + + return app.Run(os.Args) +} + +// SutureService allows for the postprocessing command to be embedded and supervised by a suture supervisor tree. +type SutureService struct { + cfg *config.Config +} + +// NewSutureService creates a new postprocessing.SutureService +func NewSutureService(cfg *ociscfg.Config) suture.Service { + cfg.Postprocessing.Commons = cfg.Commons + return SutureService{ + cfg: cfg.Postprocessing, + } +} + +// Serve to implement Server interface +func (s SutureService) Serve(ctx context.Context) error { + s.cfg.Context = ctx + return Execute(s.cfg) +} diff --git a/services/postprocessing/pkg/command/server.go b/services/postprocessing/pkg/command/server.go new file mode 100644 index 000000000..5c95b42eb --- /dev/null +++ b/services/postprocessing/pkg/command/server.go @@ -0,0 +1,71 @@ +package command + +import ( + "crypto/tls" + "crypto/x509" + "fmt" + "os" + + "github.com/cs3org/reva/v2/pkg/events/server" + "github.com/go-micro/plugins/v4/events/natsjs" + ociscrypto "github.com/owncloud/ocis/v2/ocis-pkg/crypto" + "github.com/owncloud/ocis/v2/services/postprocessing/pkg/config" + "github.com/owncloud/ocis/v2/services/postprocessing/pkg/config/parser" + "github.com/owncloud/ocis/v2/services/postprocessing/pkg/logging" + "github.com/owncloud/ocis/v2/services/postprocessing/pkg/service" + "github.com/urfave/cli/v2" +) + +// Server is the entrypoint for the server command. +func Server(cfg *config.Config) *cli.Command { + return &cli.Command{ + Name: "server", + Usage: fmt.Sprintf("start %s service without runtime (unsupervised mode)", cfg.Service.Name), + Category: "server", + Before: func(c *cli.Context) error { + err := parser.ParseConfig(cfg) + if err != nil { + fmt.Printf("%v", err) + os.Exit(1) + } + return err + }, + Action: func(c *cli.Context) error { + logger := logging.Configure(cfg.Service.Name, cfg.Log) + + evtsCfg := cfg.Postprocessing.Events + var rootCAPool *x509.CertPool + if evtsCfg.TLSRootCACertificate != "" { + rootCrtFile, err := os.Open(evtsCfg.TLSRootCACertificate) + if err != nil { + return err + } + + rootCAPool, err = ociscrypto.NewCertPoolFromPEM(rootCrtFile) + if err != nil { + return err + } + evtsCfg.TLSInsecure = false + } + + tlsConf := &tls.Config{ + InsecureSkipVerify: evtsCfg.TLSInsecure, //nolint:gosec + RootCAs: rootCAPool, + } + bus, err := server.NewNatsStream( + natsjs.TLSConfig(tlsConf), + natsjs.Address(evtsCfg.Endpoint), + natsjs.ClusterID(evtsCfg.Cluster), + ) + if err != nil { + return err + } + + svc, err := service.NewPostprocessingService(bus, logger, cfg.Postprocessing) + if err != nil { + return err + } + return svc.Run() + }, + } +} diff --git a/services/postprocessing/pkg/command/version.go b/services/postprocessing/pkg/command/version.go new file mode 100644 index 000000000..5d52c94a8 --- /dev/null +++ b/services/postprocessing/pkg/command/version.go @@ -0,0 +1,19 @@ +package command + +import ( + "github.com/owncloud/ocis/v2/services/postprocessing/pkg/config" + "github.com/urfave/cli/v2" +) + +// Version prints the service versions of all running instances. +func Version(cfg *config.Config) *cli.Command { + return &cli.Command{ + Name: "version", + Usage: "print the version of this binary and the running extension instances", + Category: "info", + Action: func(c *cli.Context) error { + // not implemented + return nil + }, + } +} diff --git a/services/postprocessing/pkg/config/config.go b/services/postprocessing/pkg/config/config.go new file mode 100644 index 000000000..c81915e1e --- /dev/null +++ b/services/postprocessing/pkg/config/config.go @@ -0,0 +1,38 @@ +package config + +import ( + "context" + "time" + + "github.com/owncloud/ocis/v2/ocis-pkg/shared" +) + +// Config combines all available configuration parts. +type Config struct { + Commons *shared.Commons `yaml:"-"` // don't use this directly as configuration for a service + + Service Service `yaml:"-"` + + Log *Log `yaml:"log"` + + Postprocessing Postprocessing `yaml:"postprocessing"` + + Context context.Context `yaml:"-"` +} + +// Postprocessing definces the config options for the postprocessing service. +type Postprocessing struct { + Events Events `yaml:"events"` + Virusscan bool `yaml:"virusscan" env:"POSTPROCESSING_VIRUSSCAN" desc:"should the system do a virusscan? Needs antivirus service"` + FTSIndex bool `yaml:"fulltextsearch" env:"POSTPROCESSING_FTS" desc:"should the system index files for fts? Needs search service"` + Delayprocessing time.Duration `yaml:"delayprocessing" env:"POSTPROCESSING_DELAY" desc:"the sytem sleeps for this time while postprocessing"` +} + +// Events combines the configuration options for the event bus. +type Events struct { + Endpoint string `yaml:"endpoint" env:"POSTPROCESSING_EVENTS_ENDPOINT" desc:"Endpoint of the event system."` + Cluster string `yaml:"cluster" env:"POSTPROCESSING_EVENTS_CLUSTER" desc:"Cluster ID of the event system."` + + TLSInsecure bool `yaml:"tls_insecure" env:"OCIS_INSECURE;SEARCH_EVENTS_TLS_INSECURE" desc:"Whether to verify the server TLS certificates."` + TLSRootCACertificate string `yaml:"tls_root_ca_certificate" env:"SEARCH_EVENTS_TLS_ROOT_CA_CERTIFICATE" desc:"The root CA certificate used to validate the server's TLS certificate. If provided SEARCH_EVENTS_TLS_INSECURE will be seen as false."` +} diff --git a/services/postprocessing/pkg/config/defaults/defaultconfig.go b/services/postprocessing/pkg/config/defaults/defaultconfig.go new file mode 100644 index 000000000..c0f3343e5 --- /dev/null +++ b/services/postprocessing/pkg/config/defaults/defaultconfig.go @@ -0,0 +1,46 @@ +package defaults + +import ( + "github.com/owncloud/ocis/v2/services/postprocessing/pkg/config" +) + +func FullDefaultConfig() *config.Config { + cfg := DefaultConfig() + EnsureDefaults(cfg) + Sanitize(cfg) + return cfg +} + +func DefaultConfig() *config.Config { + return &config.Config{ + Service: config.Service{ + Name: "postprocessing", + }, + Postprocessing: config.Postprocessing{ + Events: config.Events{ + Endpoint: "127.0.0.1:9233", + Cluster: "ocis-cluster", + }, + // Virusscan: true, + // FTSIndex: true, + }, + } +} + +func EnsureDefaults(cfg *config.Config) { + // provide with defaults for shared logging, since we need a valid destination address for BindEnv. + if cfg.Log == nil && cfg.Commons != nil && cfg.Commons.Log != nil { + cfg.Log = &config.Log{ + Level: cfg.Commons.Log.Level, + Pretty: cfg.Commons.Log.Pretty, + Color: cfg.Commons.Log.Color, + File: cfg.Commons.Log.File, + } + } else if cfg.Log == nil { + cfg.Log = &config.Log{} + } +} + +func Sanitize(cfg *config.Config) { + // nothing to sanitize here atm +} diff --git a/services/postprocessing/pkg/config/log.go b/services/postprocessing/pkg/config/log.go new file mode 100644 index 000000000..bdbd36bd5 --- /dev/null +++ b/services/postprocessing/pkg/config/log.go @@ -0,0 +1,9 @@ +package config + +// Log defines the available log configuration. +type Log struct { + Level string `mapstructure:"level" env:"OCIS_LOG_LEVEL;POSTPROCESSING_LOG_LEVEL" desc:"The log level. Valid values are: \"panic\", \"fatal\", \"error\", \"warn\", \"info\", \"debug\", \"trace\"."` + Pretty bool `mapstructure:"pretty" env:"OCIS_LOG_PRETTY;POSTPROCESSING_LOG_PRETTY" desc:"Activates pretty log output."` + Color bool `mapstructure:"color" env:"OCIS_LOG_COLOR;POSTPROCESSING_LOG_COLOR" desc:"Activates colorized log output."` + File string `mapstructure:"file" env:"OCIS_LOG_FILE;POSTPROCESSING_LOG_FILE" desc:"The path to the log file. Activates logging to this file if set."` +} diff --git a/services/postprocessing/pkg/config/parser/parse.go b/services/postprocessing/pkg/config/parser/parse.go new file mode 100644 index 000000000..03441f941 --- /dev/null +++ b/services/postprocessing/pkg/config/parser/parse.go @@ -0,0 +1,37 @@ +package parser + +import ( + "errors" + + ociscfg "github.com/owncloud/ocis/v2/ocis-pkg/config" + "github.com/owncloud/ocis/v2/services/postprocessing/pkg/config" + "github.com/owncloud/ocis/v2/services/postprocessing/pkg/config/defaults" + + "github.com/owncloud/ocis/v2/ocis-pkg/config/envdecode" +) + +// ParseConfig loads configuration from known paths. +func ParseConfig(cfg *config.Config) error { + _, err := ociscfg.BindSourcesToStructs(cfg.Service.Name, cfg) + if err != nil { + return err + } + + defaults.EnsureDefaults(cfg) + + // load all env variables relevant to the config in the current context. + if err := envdecode.Decode(cfg); err != nil { + // no environment variable set for this config is an expected "error" + if !errors.Is(err, envdecode.ErrNoTargetFieldsAreSet) { + return err + } + } + + defaults.Sanitize(cfg) + + return Validate(cfg) +} + +func Validate(cfg *config.Config) error { + return nil +} diff --git a/services/postprocessing/pkg/config/service.go b/services/postprocessing/pkg/config/service.go new file mode 100644 index 000000000..d1eac383f --- /dev/null +++ b/services/postprocessing/pkg/config/service.go @@ -0,0 +1,6 @@ +package config + +// Service defines the available service configuration. +type Service struct { + Name string `yaml:"-"` +} diff --git a/services/postprocessing/pkg/logging/logging.go b/services/postprocessing/pkg/logging/logging.go new file mode 100644 index 000000000..ae5a674a0 --- /dev/null +++ b/services/postprocessing/pkg/logging/logging.go @@ -0,0 +1,17 @@ +package logging + +import ( + "github.com/owncloud/ocis/v2/ocis-pkg/log" + "github.com/owncloud/ocis/v2/services/postprocessing/pkg/config" +) + +// LoggerFromConfig initializes a service-specific logger instance. +func Configure(name string, cfg *config.Log) log.Logger { + return log.NewLogger( + log.Name(name), + log.Level(cfg.Level), + log.Pretty(cfg.Pretty), + log.Color(cfg.Color), + log.File(cfg.File), + ) +} diff --git a/services/postprocessing/pkg/postprocessing/postprocessing.go b/services/postprocessing/pkg/postprocessing/postprocessing.go new file mode 100644 index 000000000..a7edeb7b1 --- /dev/null +++ b/services/postprocessing/pkg/postprocessing/postprocessing.go @@ -0,0 +1,122 @@ +package postprocessing + +import ( + "time" + + user "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" + provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" + "github.com/cs3org/reva/v2/pkg/events" + "github.com/owncloud/ocis/v2/services/postprocessing/pkg/config" +) + +// Postprocessing handles postprocessing of a file +type Postprocessing struct { + id string + url string + u *user.User + m map[events.Postprocessingstep]interface{} + filename string + filesize uint64 + resourceId *provider.ResourceId + c config.Postprocessing + steps []events.Postprocessingstep +} + +// New returns a new postprocessing instance +func New(uploadID string, uploadURL string, user *user.User, filename string, filesize uint64, resourceId *provider.ResourceId, c config.Postprocessing) *Postprocessing { + return &Postprocessing{ + id: uploadID, + url: uploadURL, + u: user, + m: make(map[events.Postprocessingstep]interface{}), + c: c, + filename: filename, + filesize: filesize, + resourceId: resourceId, + steps: getSteps(c), + } +} + +// Init is the first step of the postprocessing +func (pp *Postprocessing) Init(ev events.BytesReceived) interface{} { + pp.m["init"] = ev + + if len(pp.steps) == 0 { + return pp.finished(events.PPOutcomeContinue) + } + + return pp.nextStep(pp.steps[0]) +} + +// Virusscan is the virusscanning step of the postprocessing +func (pp *Postprocessing) Virusscan(ev events.VirusscanFinished) interface{} { + pp.m[events.PPStepAntivirus] = ev + + switch ev.Outcome { + case events.PPOutcomeContinue: + return pp.next(events.PPStepAntivirus) + default: + return pp.finished(ev.Outcome) + + } +} + +// Delay will sleep the configured time then continue +func (pp *Postprocessing) Delay(ev events.StartPostprocessingStep) interface{} { + pp.m[events.PPStepDelay] = ev + time.Sleep(pp.c.Delayprocessing) + return pp.next(events.PPStepDelay) +} + +func (pp *Postprocessing) next(current events.Postprocessingstep) interface{} { + l := len(pp.steps) + for i, s := range pp.steps { + if s == current && i+1 < l { + return pp.nextStep(pp.steps[i+1]) + } + } + return pp.finished(events.PPOutcomeContinue) +} + +func (pp *Postprocessing) nextStep(next events.Postprocessingstep) events.StartPostprocessingStep { + return events.StartPostprocessingStep{ + UploadID: pp.id, + URL: pp.url, + ExecutingUser: pp.u, + Filename: pp.filename, + Filesize: pp.filesize, + ResourceID: pp.resourceId, + StepToStart: next, + } +} + +func (pp *Postprocessing) finished(outcome events.PostprocessingOutcome) events.PostprocessingFinished { + return events.PostprocessingFinished{ + UploadID: pp.id, + Result: pp.m, + ExecutingUser: pp.u, + Filename: pp.filename, + Outcome: outcome, + } +} + +func getSteps(c config.Postprocessing) []events.Postprocessingstep { + // NOTE: first version only contains very basic configuration options + // But we aim for a system where postprocessing steps and their order can be configured per space + // ideally by the spaceadmin itself + // We need to iterate over configuring PP service when we see fit + var steps []events.Postprocessingstep + if c.Delayprocessing != 0 { + steps = append(steps, events.PPStepDelay) + } + + if c.Virusscan { + steps = append(steps, events.PPStepAntivirus) + } + + if c.FTSIndex { + steps = append(steps, events.PPStepFTS) + } + + return steps +} diff --git a/services/postprocessing/pkg/service/service.go b/services/postprocessing/pkg/service/service.go new file mode 100644 index 000000000..e442cb76a --- /dev/null +++ b/services/postprocessing/pkg/service/service.go @@ -0,0 +1,75 @@ +package service + +import ( + "github.com/cs3org/reva/v2/pkg/events" + "github.com/owncloud/ocis/v2/ocis-pkg/log" + "github.com/owncloud/ocis/v2/services/postprocessing/pkg/config" + "github.com/owncloud/ocis/v2/services/postprocessing/pkg/postprocessing" +) + +// PostprocessingService is an instance of the service handling postprocessing of files +type PostprocessingService struct { + log log.Logger + events <-chan interface{} + pub events.Publisher + c config.Postprocessing +} + +// NewPostprocessingService returns a new instance of a postprocessing service +func NewPostprocessingService(stream events.Stream, logger log.Logger, c config.Postprocessing) (*PostprocessingService, error) { + evs, err := events.Consume(stream, "postprocessing", + events.BytesReceived{}, + events.StartPostprocessingStep{}, + events.VirusscanFinished{}, + events.UploadReady{}, + ) + if err != nil { + return nil, err + } + + return &PostprocessingService{ + log: logger, + events: evs, + pub: stream, + c: c, + }, nil +} + +// Run to fulfil Runner interface +func (pps *PostprocessingService) Run() error { + current := make(map[string]*postprocessing.Postprocessing) + for e := range pps.events { + var next interface{} + switch ev := e.(type) { + case events.BytesReceived: + pp := postprocessing.New(ev.UploadID, ev.URL, ev.ExecutingUser, ev.Filename, ev.Filesize, ev.ResourceID, pps.c) + current[ev.UploadID] = pp + next = pp.Init(ev) + case events.VirusscanFinished: + pp := current[ev.UploadID] + if pp == nil { + // no current upload - this was an on demand scan + continue + } + next = pp.Virusscan(ev) + case events.StartPostprocessingStep: + if ev.StepToStart != events.PPStepDelay { + continue + } + pp := current[ev.UploadID] + next = pp.Delay(ev) + case events.UploadReady: + // the storage provider thinks the upload is done - so no need to keep it any more + delete(current, ev.UploadID) + } + + if next != nil { + if err := events.Publish(pps.pub, next); err != nil { + pps.log.Error().Err(err).Msg("unable to publish event") + return err // we can't publish -> we are screwed + } + } + + } + return nil +}