add postprocessing service

Signed-off-by: jkoberg <jkoberg@owncloud.com>
This commit is contained in:
jkoberg
2022-12-08 11:00:42 +01:00
parent c2be1370ea
commit a004095a6c
15 changed files with 565 additions and 0 deletions

View File

@@ -0,0 +1,18 @@
package command
import (
"github.com/owncloud/ocis/v2/services/postprocessing/pkg/config"
"github.com/urfave/cli/v2"
)
// Health is the entrypoint for the health command.
func Health(cfg *config.Config) *cli.Command {
return &cli.Command{
Name: "health",
Usage: "Check health status",
Action: func(c *cli.Context) error {
// Not implemented
return nil
},
}
}

View File

@@ -0,0 +1,56 @@
package command
import (
"context"
"os"
"github.com/owncloud/ocis/v2/ocis-pkg/clihelper"
ociscfg "github.com/owncloud/ocis/v2/ocis-pkg/config"
"github.com/owncloud/ocis/v2/services/postprocessing/pkg/config"
"github.com/thejerf/suture/v4"
"github.com/urfave/cli/v2"
)
// GetCommands provides all commands for this service
func GetCommands(cfg *config.Config) cli.Commands {
return []*cli.Command{
// start this service
Server(cfg),
// interaction with this service
// infos about this service
Health(cfg),
Version(cfg),
}
}
// Execute is the entry point for the postprocessing command.
func Execute(cfg *config.Config) error {
app := clihelper.DefaultApp(&cli.App{
Name: "postprocessing",
Usage: "starts postprocessing service",
Commands: GetCommands(cfg),
})
return app.Run(os.Args)
}
// SutureService allows for the postprocessing command to be embedded and supervised by a suture supervisor tree.
type SutureService struct {
cfg *config.Config
}
// NewSutureService creates a new postprocessing.SutureService
func NewSutureService(cfg *ociscfg.Config) suture.Service {
cfg.Postprocessing.Commons = cfg.Commons
return SutureService{
cfg: cfg.Postprocessing,
}
}
// Serve to implement Server interface
func (s SutureService) Serve(ctx context.Context) error {
s.cfg.Context = ctx
return Execute(s.cfg)
}

View File

@@ -0,0 +1,71 @@
package command
import (
"crypto/tls"
"crypto/x509"
"fmt"
"os"
"github.com/cs3org/reva/v2/pkg/events/server"
"github.com/go-micro/plugins/v4/events/natsjs"
ociscrypto "github.com/owncloud/ocis/v2/ocis-pkg/crypto"
"github.com/owncloud/ocis/v2/services/postprocessing/pkg/config"
"github.com/owncloud/ocis/v2/services/postprocessing/pkg/config/parser"
"github.com/owncloud/ocis/v2/services/postprocessing/pkg/logging"
"github.com/owncloud/ocis/v2/services/postprocessing/pkg/service"
"github.com/urfave/cli/v2"
)
// Server is the entrypoint for the server command.
func Server(cfg *config.Config) *cli.Command {
return &cli.Command{
Name: "server",
Usage: fmt.Sprintf("start %s service without runtime (unsupervised mode)", cfg.Service.Name),
Category: "server",
Before: func(c *cli.Context) error {
err := parser.ParseConfig(cfg)
if err != nil {
fmt.Printf("%v", err)
os.Exit(1)
}
return err
},
Action: func(c *cli.Context) error {
logger := logging.Configure(cfg.Service.Name, cfg.Log)
evtsCfg := cfg.Postprocessing.Events
var rootCAPool *x509.CertPool
if evtsCfg.TLSRootCACertificate != "" {
rootCrtFile, err := os.Open(evtsCfg.TLSRootCACertificate)
if err != nil {
return err
}
rootCAPool, err = ociscrypto.NewCertPoolFromPEM(rootCrtFile)
if err != nil {
return err
}
evtsCfg.TLSInsecure = false
}
tlsConf := &tls.Config{
InsecureSkipVerify: evtsCfg.TLSInsecure, //nolint:gosec
RootCAs: rootCAPool,
}
bus, err := server.NewNatsStream(
natsjs.TLSConfig(tlsConf),
natsjs.Address(evtsCfg.Endpoint),
natsjs.ClusterID(evtsCfg.Cluster),
)
if err != nil {
return err
}
svc, err := service.NewPostprocessingService(bus, logger, cfg.Postprocessing)
if err != nil {
return err
}
return svc.Run()
},
}
}

View File

@@ -0,0 +1,19 @@
package command
import (
"github.com/owncloud/ocis/v2/services/postprocessing/pkg/config"
"github.com/urfave/cli/v2"
)
// Version prints the service versions of all running instances.
func Version(cfg *config.Config) *cli.Command {
return &cli.Command{
Name: "version",
Usage: "print the version of this binary and the running extension instances",
Category: "info",
Action: func(c *cli.Context) error {
// not implemented
return nil
},
}
}

View File

@@ -0,0 +1,38 @@
package config
import (
"context"
"time"
"github.com/owncloud/ocis/v2/ocis-pkg/shared"
)
// Config combines all available configuration parts.
type Config struct {
Commons *shared.Commons `yaml:"-"` // don't use this directly as configuration for a service
Service Service `yaml:"-"`
Log *Log `yaml:"log"`
Postprocessing Postprocessing `yaml:"postprocessing"`
Context context.Context `yaml:"-"`
}
// Postprocessing definces the config options for the postprocessing service.
type Postprocessing struct {
Events Events `yaml:"events"`
Virusscan bool `yaml:"virusscan" env:"POSTPROCESSING_VIRUSSCAN" desc:"should the system do a virusscan? Needs antivirus service"`
FTSIndex bool `yaml:"fulltextsearch" env:"POSTPROCESSING_FTS" desc:"should the system index files for fts? Needs search service"`
Delayprocessing time.Duration `yaml:"delayprocessing" env:"POSTPROCESSING_DELAY" desc:"the sytem sleeps for this time while postprocessing"`
}
// Events combines the configuration options for the event bus.
type Events struct {
Endpoint string `yaml:"endpoint" env:"POSTPROCESSING_EVENTS_ENDPOINT" desc:"Endpoint of the event system."`
Cluster string `yaml:"cluster" env:"POSTPROCESSING_EVENTS_CLUSTER" desc:"Cluster ID of the event system."`
TLSInsecure bool `yaml:"tls_insecure" env:"OCIS_INSECURE;SEARCH_EVENTS_TLS_INSECURE" desc:"Whether to verify the server TLS certificates."`
TLSRootCACertificate string `yaml:"tls_root_ca_certificate" env:"SEARCH_EVENTS_TLS_ROOT_CA_CERTIFICATE" desc:"The root CA certificate used to validate the server's TLS certificate. If provided SEARCH_EVENTS_TLS_INSECURE will be seen as false."`
}

View File

@@ -0,0 +1,46 @@
package defaults
import (
"github.com/owncloud/ocis/v2/services/postprocessing/pkg/config"
)
func FullDefaultConfig() *config.Config {
cfg := DefaultConfig()
EnsureDefaults(cfg)
Sanitize(cfg)
return cfg
}
func DefaultConfig() *config.Config {
return &config.Config{
Service: config.Service{
Name: "postprocessing",
},
Postprocessing: config.Postprocessing{
Events: config.Events{
Endpoint: "127.0.0.1:9233",
Cluster: "ocis-cluster",
},
// Virusscan: true,
// FTSIndex: true,
},
}
}
func EnsureDefaults(cfg *config.Config) {
// provide with defaults for shared logging, since we need a valid destination address for BindEnv.
if cfg.Log == nil && cfg.Commons != nil && cfg.Commons.Log != nil {
cfg.Log = &config.Log{
Level: cfg.Commons.Log.Level,
Pretty: cfg.Commons.Log.Pretty,
Color: cfg.Commons.Log.Color,
File: cfg.Commons.Log.File,
}
} else if cfg.Log == nil {
cfg.Log = &config.Log{}
}
}
func Sanitize(cfg *config.Config) {
// nothing to sanitize here atm
}

View File

@@ -0,0 +1,9 @@
package config
// Log defines the available log configuration.
type Log struct {
Level string `mapstructure:"level" env:"OCIS_LOG_LEVEL;POSTPROCESSING_LOG_LEVEL" desc:"The log level. Valid values are: \"panic\", \"fatal\", \"error\", \"warn\", \"info\", \"debug\", \"trace\"."`
Pretty bool `mapstructure:"pretty" env:"OCIS_LOG_PRETTY;POSTPROCESSING_LOG_PRETTY" desc:"Activates pretty log output."`
Color bool `mapstructure:"color" env:"OCIS_LOG_COLOR;POSTPROCESSING_LOG_COLOR" desc:"Activates colorized log output."`
File string `mapstructure:"file" env:"OCIS_LOG_FILE;POSTPROCESSING_LOG_FILE" desc:"The path to the log file. Activates logging to this file if set."`
}

View File

@@ -0,0 +1,37 @@
package parser
import (
"errors"
ociscfg "github.com/owncloud/ocis/v2/ocis-pkg/config"
"github.com/owncloud/ocis/v2/services/postprocessing/pkg/config"
"github.com/owncloud/ocis/v2/services/postprocessing/pkg/config/defaults"
"github.com/owncloud/ocis/v2/ocis-pkg/config/envdecode"
)
// ParseConfig loads configuration from known paths.
func ParseConfig(cfg *config.Config) error {
_, err := ociscfg.BindSourcesToStructs(cfg.Service.Name, cfg)
if err != nil {
return err
}
defaults.EnsureDefaults(cfg)
// load all env variables relevant to the config in the current context.
if err := envdecode.Decode(cfg); err != nil {
// no environment variable set for this config is an expected "error"
if !errors.Is(err, envdecode.ErrNoTargetFieldsAreSet) {
return err
}
}
defaults.Sanitize(cfg)
return Validate(cfg)
}
func Validate(cfg *config.Config) error {
return nil
}

View File

@@ -0,0 +1,6 @@
package config
// Service defines the available service configuration.
type Service struct {
Name string `yaml:"-"`
}

View File

@@ -0,0 +1,17 @@
package logging
import (
"github.com/owncloud/ocis/v2/ocis-pkg/log"
"github.com/owncloud/ocis/v2/services/postprocessing/pkg/config"
)
// LoggerFromConfig initializes a service-specific logger instance.
func Configure(name string, cfg *config.Log) log.Logger {
return log.NewLogger(
log.Name(name),
log.Level(cfg.Level),
log.Pretty(cfg.Pretty),
log.Color(cfg.Color),
log.File(cfg.File),
)
}

View File

@@ -0,0 +1,122 @@
package postprocessing
import (
"time"
user "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"github.com/cs3org/reva/v2/pkg/events"
"github.com/owncloud/ocis/v2/services/postprocessing/pkg/config"
)
// Postprocessing handles postprocessing of a file
type Postprocessing struct {
id string
url string
u *user.User
m map[events.Postprocessingstep]interface{}
filename string
filesize uint64
resourceId *provider.ResourceId
c config.Postprocessing
steps []events.Postprocessingstep
}
// New returns a new postprocessing instance
func New(uploadID string, uploadURL string, user *user.User, filename string, filesize uint64, resourceId *provider.ResourceId, c config.Postprocessing) *Postprocessing {
return &Postprocessing{
id: uploadID,
url: uploadURL,
u: user,
m: make(map[events.Postprocessingstep]interface{}),
c: c,
filename: filename,
filesize: filesize,
resourceId: resourceId,
steps: getSteps(c),
}
}
// Init is the first step of the postprocessing
func (pp *Postprocessing) Init(ev events.BytesReceived) interface{} {
pp.m["init"] = ev
if len(pp.steps) == 0 {
return pp.finished(events.PPOutcomeContinue)
}
return pp.nextStep(pp.steps[0])
}
// Virusscan is the virusscanning step of the postprocessing
func (pp *Postprocessing) Virusscan(ev events.VirusscanFinished) interface{} {
pp.m[events.PPStepAntivirus] = ev
switch ev.Outcome {
case events.PPOutcomeContinue:
return pp.next(events.PPStepAntivirus)
default:
return pp.finished(ev.Outcome)
}
}
// Delay will sleep the configured time then continue
func (pp *Postprocessing) Delay(ev events.StartPostprocessingStep) interface{} {
pp.m[events.PPStepDelay] = ev
time.Sleep(pp.c.Delayprocessing)
return pp.next(events.PPStepDelay)
}
func (pp *Postprocessing) next(current events.Postprocessingstep) interface{} {
l := len(pp.steps)
for i, s := range pp.steps {
if s == current && i+1 < l {
return pp.nextStep(pp.steps[i+1])
}
}
return pp.finished(events.PPOutcomeContinue)
}
func (pp *Postprocessing) nextStep(next events.Postprocessingstep) events.StartPostprocessingStep {
return events.StartPostprocessingStep{
UploadID: pp.id,
URL: pp.url,
ExecutingUser: pp.u,
Filename: pp.filename,
Filesize: pp.filesize,
ResourceID: pp.resourceId,
StepToStart: next,
}
}
func (pp *Postprocessing) finished(outcome events.PostprocessingOutcome) events.PostprocessingFinished {
return events.PostprocessingFinished{
UploadID: pp.id,
Result: pp.m,
ExecutingUser: pp.u,
Filename: pp.filename,
Outcome: outcome,
}
}
func getSteps(c config.Postprocessing) []events.Postprocessingstep {
// NOTE: first version only contains very basic configuration options
// But we aim for a system where postprocessing steps and their order can be configured per space
// ideally by the spaceadmin itself
// We need to iterate over configuring PP service when we see fit
var steps []events.Postprocessingstep
if c.Delayprocessing != 0 {
steps = append(steps, events.PPStepDelay)
}
if c.Virusscan {
steps = append(steps, events.PPStepAntivirus)
}
if c.FTSIndex {
steps = append(steps, events.PPStepFTS)
}
return steps
}

View File

@@ -0,0 +1,75 @@
package service
import (
"github.com/cs3org/reva/v2/pkg/events"
"github.com/owncloud/ocis/v2/ocis-pkg/log"
"github.com/owncloud/ocis/v2/services/postprocessing/pkg/config"
"github.com/owncloud/ocis/v2/services/postprocessing/pkg/postprocessing"
)
// PostprocessingService is an instance of the service handling postprocessing of files
type PostprocessingService struct {
log log.Logger
events <-chan interface{}
pub events.Publisher
c config.Postprocessing
}
// NewPostprocessingService returns a new instance of a postprocessing service
func NewPostprocessingService(stream events.Stream, logger log.Logger, c config.Postprocessing) (*PostprocessingService, error) {
evs, err := events.Consume(stream, "postprocessing",
events.BytesReceived{},
events.StartPostprocessingStep{},
events.VirusscanFinished{},
events.UploadReady{},
)
if err != nil {
return nil, err
}
return &PostprocessingService{
log: logger,
events: evs,
pub: stream,
c: c,
}, nil
}
// Run to fulfil Runner interface
func (pps *PostprocessingService) Run() error {
current := make(map[string]*postprocessing.Postprocessing)
for e := range pps.events {
var next interface{}
switch ev := e.(type) {
case events.BytesReceived:
pp := postprocessing.New(ev.UploadID, ev.URL, ev.ExecutingUser, ev.Filename, ev.Filesize, ev.ResourceID, pps.c)
current[ev.UploadID] = pp
next = pp.Init(ev)
case events.VirusscanFinished:
pp := current[ev.UploadID]
if pp == nil {
// no current upload - this was an on demand scan
continue
}
next = pp.Virusscan(ev)
case events.StartPostprocessingStep:
if ev.StepToStart != events.PPStepDelay {
continue
}
pp := current[ev.UploadID]
next = pp.Delay(ev)
case events.UploadReady:
// the storage provider thinks the upload is done - so no need to keep it any more
delete(current, ev.UploadID)
}
if next != nil {
if err := events.Publish(pps.pub, next); err != nil {
pps.log.Error().Err(err).Msg("unable to publish event")
return err // we can't publish -> we are screwed
}
}
}
return nil
}