Merge pull request #5207 from kobergj/AsyncPostprocessing

[full-ci] Async Postprocessing
This commit is contained in:
kobergj
2022-12-16 11:58:24 +01:00
committed by GitHub
24 changed files with 657 additions and 35 deletions
+4 -2
View File
@@ -873,11 +873,13 @@ def wopiValidatorTests(ctx, storage, accounts_hash_difficulty = 4):
"image": OC_CI_ALPINE,
"environment": {},
"commands": [
"curl -k 'https://ocis-server:9200/remote.php/webdav/test.wopitest' --fail --retry-connrefused --retry 7 --retry-all-errors -X PUT -u admin:admin -D headers.txt",
"curl -v -X PUT 'https://ocis-server:9200/remote.php/webdav/test.wopitest' -k --fail --retry-connrefused --retry 7 --retry-all-errors -u admin:admin -D headers.txt",
"cat headers.txt",
"export FILE_ID=$(cat headers.txt | sed -n -e 's/^.*Oc-Fileid: //p')",
"export URL=\"https://ocis-server:9200/app/open?app_name=FakeOffice&file_id=$FILE_ID\"",
"export URL=$(echo $URL | tr -d '[:cntrl:]')",
"curl -k -X POST \"$URL\" -u admin:admin -v > open.json",
"curl -v -X POST \"$URL\" -k --fail --retry-connrefused --retry 7 --retry-all-errors -u admin:admin > open.json",
"cat open.json",
"cat open.json | jq .form_parameters.access_token | tr -d '\"' > accesstoken",
"cat open.json | jq .form_parameters.access_token_ttl | tr -d '\"' > accesstokenttl",
"echo -n 'http://wopiserver:8880/wopi/files/' > wopisrc",
@@ -0,0 +1,5 @@
Enhancement: Async Postprocessing
Provides functionality for async postprocessing. This will allow the system to do the postprocessing (virusscan, copying of bytes to their final destination, ...) asynchronous to the users request. Major change when active.
https://github.com/owncloud/ocis/pull/5207
+1 -1
View File
@@ -11,7 +11,7 @@ require (
github.com/blevesearch/bleve/v2 v2.3.5
github.com/coreos/go-oidc/v3 v3.4.0
github.com/cs3org/go-cs3apis v0.0.0-20221012090518-ef2996678965
github.com/cs3org/reva/v2 v2.12.0
github.com/cs3org/reva/v2 v2.12.1-0.20221214090401-47e0591bb902
github.com/disintegration/imaging v1.6.2
github.com/ggwhite/go-masker v1.0.9
github.com/go-chi/chi/v5 v5.0.7
+2 -2
View File
@@ -343,8 +343,8 @@ github.com/crewjam/saml v0.4.6 h1:XCUFPkQSJLvzyl4cW9OvpWUbRf0gE7VUpU8ZnilbeM4=
github.com/crewjam/saml v0.4.6/go.mod h1:ZBOXnNPFzB3CgOkRm7Nd6IVdkG+l/wF+0ZXLqD96t1A=
github.com/cs3org/go-cs3apis v0.0.0-20221012090518-ef2996678965 h1:y4n2j68LLnvac+zw/al8MfPgO5aQiIwLmHM/JzYN8AM=
github.com/cs3org/go-cs3apis v0.0.0-20221012090518-ef2996678965/go.mod h1:UXha4TguuB52H14EMoSsCqDj7k8a/t7g4gVP+bgY5LY=
github.com/cs3org/reva/v2 v2.12.0 h1:KGQnNje13BbWuQBnxnWKyk+JjYTrETE8Q71KqKpzQQo=
github.com/cs3org/reva/v2 v2.12.0/go.mod h1:+lH5G0UmNjMNj4F0bDhbh+HqL1UihlbL8zPBa57Y2QI=
github.com/cs3org/reva/v2 v2.12.1-0.20221214090401-47e0591bb902 h1:r8K9y0RMFXjQlrbx17iQziWYhNyAYmh70ixaXbQHsHY=
github.com/cs3org/reva/v2 v2.12.1-0.20221214090401-47e0591bb902/go.mod h1:GpocVB1w6yxeSr1VBsO9jztmt1SyNC4lCwudLwDzxHQ=
github.com/cubewise-code/go-mime v0.0.0-20200519001935-8c5762b177d8 h1:Z9lwXumT5ACSmJ7WGnFl+OMLLjpz5uR2fyz7dC255FI=
github.com/cubewise-code/go-mime v0.0.0-20200519001935-8c5762b177d8/go.mod h1:4abs/jPXcmJzYoYGF91JF9Uq9s/KL5n1jvFDix8KcqY=
github.com/cyberdelia/templates v0.0.0-20141128023046-ca7fffd4298c/go.mod h1:GyV+0YP4qX0UQ7r2MoYZ+AvYDp12OF5yg4q8rGnyNh4=
+31 -29
View File
@@ -19,6 +19,7 @@ import (
notifications "github.com/owncloud/ocis/v2/services/notifications/pkg/config"
ocdav "github.com/owncloud/ocis/v2/services/ocdav/pkg/config"
ocs "github.com/owncloud/ocis/v2/services/ocs/pkg/config"
postprocessing "github.com/owncloud/ocis/v2/services/postprocessing/pkg/config"
proxy "github.com/owncloud/ocis/v2/services/proxy/pkg/config"
search "github.com/owncloud/ocis/v2/services/search/pkg/config"
settings "github.com/owncloud/ocis/v2/services/settings/pkg/config"
@@ -76,33 +77,34 @@ type Config struct {
AdminUserID string `yaml:"admin_user_id" env:"OCIS_ADMIN_USER_ID" desc:"ID of a user, that should receive admin privileges."`
Runtime Runtime `yaml:"runtime"`
AppProvider *appProvider.Config `yaml:"app_provider"`
AppRegistry *appRegistry.Config `yaml:"app_registry"`
Audit *audit.Config `yaml:"audit"`
AuthBasic *authbasic.Config `yaml:"auth_basic"`
AuthBearer *authbearer.Config `yaml:"auth_bearer"`
AuthMachine *authmachine.Config `yaml:"auth_machine"`
Frontend *frontend.Config `yaml:"frontend"`
Gateway *gateway.Config `yaml:"gateway"`
Graph *graph.Config `yaml:"graph"`
Groups *groups.Config `yaml:"groups"`
IDM *idm.Config `yaml:"idm"`
IDP *idp.Config `yaml:"idp"`
Nats *nats.Config `yaml:"nats"`
Notifications *notifications.Config `yaml:"notifications"`
OCDav *ocdav.Config `yaml:"ocdav"`
OCS *ocs.Config `yaml:"ocs"`
Proxy *proxy.Config `yaml:"proxy"`
Settings *settings.Config `yaml:"settings"`
Sharing *sharing.Config `yaml:"sharing"`
StorageSystem *storagesystem.Config `yaml:"storage_system"`
StoragePublicLink *storagepublic.Config `yaml:"storage_public"`
StorageShares *storageshares.Config `yaml:"storage_shares"`
StorageUsers *storageusers.Config `yaml:"storage_users"`
Store *store.Config `yaml:"store"`
Thumbnails *thumbnails.Config `yaml:"thumbnails"`
Users *users.Config `yaml:"users"`
Web *web.Config `yaml:"web"`
WebDAV *webdav.Config `yaml:"webdav"`
Search *search.Config `yaml:"search"`
AppProvider *appProvider.Config `yaml:"app_provider"`
AppRegistry *appRegistry.Config `yaml:"app_registry"`
Audit *audit.Config `yaml:"audit"`
AuthBasic *authbasic.Config `yaml:"auth_basic"`
AuthBearer *authbearer.Config `yaml:"auth_bearer"`
AuthMachine *authmachine.Config `yaml:"auth_machine"`
Frontend *frontend.Config `yaml:"frontend"`
Gateway *gateway.Config `yaml:"gateway"`
Graph *graph.Config `yaml:"graph"`
Groups *groups.Config `yaml:"groups"`
IDM *idm.Config `yaml:"idm"`
IDP *idp.Config `yaml:"idp"`
Nats *nats.Config `yaml:"nats"`
Notifications *notifications.Config `yaml:"notifications"`
OCDav *ocdav.Config `yaml:"ocdav"`
OCS *ocs.Config `yaml:"ocs"`
Postprocessing *postprocessing.Config `yaml:"postprocessing"`
Proxy *proxy.Config `yaml:"proxy"`
Settings *settings.Config `yaml:"settings"`
Sharing *sharing.Config `yaml:"sharing"`
StorageSystem *storagesystem.Config `yaml:"storage_system"`
StoragePublicLink *storagepublic.Config `yaml:"storage_public"`
StorageShares *storageshares.Config `yaml:"storage_shares"`
StorageUsers *storageusers.Config `yaml:"storage_users"`
Store *store.Config `yaml:"store"`
Thumbnails *thumbnails.Config `yaml:"thumbnails"`
Users *users.Config `yaml:"users"`
Web *web.Config `yaml:"web"`
WebDAV *webdav.Config `yaml:"webdav"`
Search *search.Config `yaml:"search"`
}
+2
View File
@@ -17,6 +17,7 @@ import (
notifications "github.com/owncloud/ocis/v2/services/notifications/pkg/config/defaults"
ocdav "github.com/owncloud/ocis/v2/services/ocdav/pkg/config/defaults"
ocs "github.com/owncloud/ocis/v2/services/ocs/pkg/config/defaults"
postprocessing "github.com/owncloud/ocis/v2/services/postprocessing/pkg/config/defaults"
proxy "github.com/owncloud/ocis/v2/services/proxy/pkg/config/defaults"
search "github.com/owncloud/ocis/v2/services/search/pkg/config/defaults"
settings "github.com/owncloud/ocis/v2/services/settings/pkg/config/defaults"
@@ -56,6 +57,7 @@ func DefaultConfig() *Config {
Notifications: notifications.DefaultConfig(),
OCDav: ocdav.DefaultConfig(),
OCS: ocs.DefaultConfig(),
Postprocessing: postprocessing.DefaultConfig(),
Proxy: proxy.DefaultConfig(),
Search: search.FullDefaultConfig(),
Settings: settings.DefaultConfig(),
+3 -1
View File
@@ -34,6 +34,7 @@ import (
notifications "github.com/owncloud/ocis/v2/services/notifications/pkg/command"
ocdav "github.com/owncloud/ocis/v2/services/ocdav/pkg/command"
ocs "github.com/owncloud/ocis/v2/services/ocs/pkg/command"
postprocessing "github.com/owncloud/ocis/v2/services/postprocessing/pkg/command"
proxy "github.com/owncloud/ocis/v2/services/proxy/pkg/command"
search "github.com/owncloud/ocis/v2/services/search/pkg/command"
settings "github.com/owncloud/ocis/v2/services/settings/pkg/command"
@@ -125,6 +126,7 @@ func NewService(options ...Option) (*Service, error) {
s.ServicesRegistry[opts.Config.AppProvider.Service.Name] = appProvider.NewSutureService
s.ServicesRegistry[opts.Config.Notifications.Service.Name] = notifications.NewSutureService
s.ServicesRegistry[opts.Config.Search.Service.Name] = search.NewSutureService
s.ServicesRegistry[opts.Config.Postprocessing.Service.Name] = postprocessing.NewSutureService
// populate delayed services
s.Delayed[opts.Config.Sharing.Service.Name] = sharing.NewSutureService
@@ -262,7 +264,7 @@ func (s *Service) generateRunSet(cfg *ociscfg.Config) {
}
// List running processes for the Service Controller.
func (s *Service) List(args struct{}, reply *string) error {
func (s *Service) List(_ struct{}, reply *string) error {
tableString := &strings.Builder{}
table := tablewriter.NewWriter(tableString)
table.SetHeader([]string{"Service"})
+37
View File
@@ -0,0 +1,37 @@
SHELL := bash
NAME := postprocessing
include ../../.make/recursion.mk
############ tooling ############
ifneq (, $(shell command -v go 2> /dev/null)) # suppress `command not found warnings` for non go targets in CI
include ../../.bingo/Variables.mk
endif
############ go tooling ############
include ../../.make/go.mk
############ release ############
include ../../.make/release.mk
############ docs generate ############
include ../../.make/docs.mk
.PHONY: docs-generate
docs-generate: config-docs-generate
############ generate ############
include ../../.make/generate.mk
.PHONY: ci-go-generate
ci-go-generate: # CI runs ci-node-generate automatically before this target
.PHONY: ci-node-generate
ci-node-generate:
############ licenses ############
.PHONY: ci-node-check-licenses
ci-node-check-licenses:
.PHONY: ci-node-save-licenses
ci-node-save-licenses:
@@ -0,0 +1,14 @@
package main
import (
"os"
"github.com/owncloud/ocis/v2/services/postprocessing/pkg/command"
"github.com/owncloud/ocis/v2/services/postprocessing/pkg/config/defaults"
)
func main() {
if err := command.Execute(defaults.DefaultConfig()); err != nil {
os.Exit(1)
}
}
@@ -0,0 +1,18 @@
package command
import (
"github.com/owncloud/ocis/v2/services/postprocessing/pkg/config"
"github.com/urfave/cli/v2"
)
// Health is the entrypoint for the health command.
func Health(cfg *config.Config) *cli.Command {
return &cli.Command{
Name: "health",
Usage: "Check health status",
Action: func(c *cli.Context) error {
// Not implemented
return nil
},
}
}
@@ -0,0 +1,56 @@
package command
import (
"context"
"os"
"github.com/owncloud/ocis/v2/ocis-pkg/clihelper"
ociscfg "github.com/owncloud/ocis/v2/ocis-pkg/config"
"github.com/owncloud/ocis/v2/services/postprocessing/pkg/config"
"github.com/thejerf/suture/v4"
"github.com/urfave/cli/v2"
)
// GetCommands provides all commands for this service
func GetCommands(cfg *config.Config) cli.Commands {
return []*cli.Command{
// start this service
Server(cfg),
// interaction with this service
// infos about this service
Health(cfg),
Version(cfg),
}
}
// Execute is the entry point for the postprocessing command.
func Execute(cfg *config.Config) error {
app := clihelper.DefaultApp(&cli.App{
Name: "postprocessing",
Usage: "starts postprocessing service",
Commands: GetCommands(cfg),
})
return app.Run(os.Args)
}
// SutureService allows for the postprocessing command to be embedded and supervised by a suture supervisor tree.
type SutureService struct {
cfg *config.Config
}
// NewSutureService creates a new postprocessing.SutureService
func NewSutureService(cfg *ociscfg.Config) suture.Service {
cfg.Postprocessing.Commons = cfg.Commons
return SutureService{
cfg: cfg.Postprocessing,
}
}
// Serve to implement Server interface
func (s SutureService) Serve(ctx context.Context) error {
s.cfg.Context = ctx
return Execute(s.cfg)
}
@@ -0,0 +1,75 @@
package command
import (
"crypto/tls"
"crypto/x509"
"fmt"
"os"
"github.com/cs3org/reva/v2/pkg/events/server"
"github.com/go-micro/plugins/v4/events/natsjs"
ociscrypto "github.com/owncloud/ocis/v2/ocis-pkg/crypto"
"github.com/owncloud/ocis/v2/services/postprocessing/pkg/config"
"github.com/owncloud/ocis/v2/services/postprocessing/pkg/config/parser"
"github.com/owncloud/ocis/v2/services/postprocessing/pkg/logging"
"github.com/owncloud/ocis/v2/services/postprocessing/pkg/service"
"github.com/urfave/cli/v2"
)
// Server is the entrypoint for the server command.
func Server(cfg *config.Config) *cli.Command {
return &cli.Command{
Name: "server",
Usage: fmt.Sprintf("start %s service without runtime (unsupervised mode)", cfg.Service.Name),
Category: "server",
Before: func(c *cli.Context) error {
err := parser.ParseConfig(cfg)
if err != nil {
fmt.Printf("%v", err)
os.Exit(1)
}
return err
},
Action: func(c *cli.Context) error {
logger := logging.Configure(cfg.Service.Name, cfg.Log)
evtsCfg := cfg.Postprocessing.Events
var tlsConf *tls.Config
if !evtsCfg.TLSInsecure {
var rootCAPool *x509.CertPool
if evtsCfg.TLSRootCACertificate != "" {
rootCrtFile, err := os.Open(evtsCfg.TLSRootCACertificate)
if err != nil {
return err
}
rootCAPool, err = ociscrypto.NewCertPoolFromPEM(rootCrtFile)
if err != nil {
return err
}
evtsCfg.TLSInsecure = false
}
tlsConf = &tls.Config{
RootCAs: rootCAPool,
}
}
bus, err := server.NewNatsStream(
natsjs.TLSConfig(tlsConf),
natsjs.Address(evtsCfg.Endpoint),
natsjs.ClusterID(evtsCfg.Cluster),
)
if err != nil {
return err
}
svc, err := service.NewPostprocessingService(bus, logger, cfg.Postprocessing)
if err != nil {
return err
}
return svc.Run()
},
}
}
@@ -0,0 +1,19 @@
package command
import (
"github.com/owncloud/ocis/v2/services/postprocessing/pkg/config"
"github.com/urfave/cli/v2"
)
// Version prints the service versions of all running instances.
func Version(cfg *config.Config) *cli.Command {
return &cli.Command{
Name: "version",
Usage: "print the version of this binary and the running extension instances",
Category: "info",
Action: func(c *cli.Context) error {
// not implemented
return nil
},
}
}
@@ -0,0 +1,37 @@
package config
import (
"context"
"time"
"github.com/owncloud/ocis/v2/ocis-pkg/shared"
)
// Config combines all available configuration parts.
type Config struct {
Commons *shared.Commons `yaml:"-"` // don't use this directly as configuration for a service
Service Service `yaml:"-"`
Log *Log `yaml:"log"`
Postprocessing Postprocessing `yaml:"postprocessing"`
Context context.Context `yaml:"-"`
}
// Postprocessing definces the config options for the postprocessing service.
type Postprocessing struct {
Events Events `yaml:"events"`
Virusscan bool `yaml:"virusscan" env:"POSTPROCESSING_VIRUSSCAN" desc:"should the system do a virusscan? Needs antivirus service"`
Delayprocessing time.Duration `yaml:"delayprocessing" env:"POSTPROCESSING_DELAY" desc:"the sytem sleeps for this time while postprocessing"`
}
// Events combines the configuration options for the event bus.
type Events struct {
Endpoint string `yaml:"endpoint" env:"POSTPROCESSING_EVENTS_ENDPOINT" desc:"Endpoint of the event system."`
Cluster string `yaml:"cluster" env:"POSTPROCESSING_EVENTS_CLUSTER" desc:"Cluster ID of the event system."`
TLSInsecure bool `yaml:"tls_insecure" env:"OCIS_INSECURE;SEARCH_EVENTS_TLS_INSECURE" desc:"Whether to verify the server TLS certificates."`
TLSRootCACertificate string `yaml:"tls_root_ca_certificate" env:"SEARCH_EVENTS_TLS_ROOT_CA_CERTIFICATE" desc:"The root CA certificate used to validate the server's TLS certificate. If provided SEARCH_EVENTS_TLS_INSECURE will be seen as false."`
}
@@ -0,0 +1,47 @@
package defaults
import (
"github.com/owncloud/ocis/v2/services/postprocessing/pkg/config"
)
// FullDefaultConfig returns a full sanitized config
func FullDefaultConfig() *config.Config {
cfg := DefaultConfig()
EnsureDefaults(cfg)
Sanitize(cfg)
return cfg
}
// DefaultConfig is the default configuration
func DefaultConfig() *config.Config {
return &config.Config{
Service: config.Service{
Name: "postprocessing",
},
Postprocessing: config.Postprocessing{
Events: config.Events{
Endpoint: "127.0.0.1:9233",
Cluster: "ocis-cluster",
},
},
}
}
// EnsureDefaults ensures defaults on a config
func EnsureDefaults(cfg *config.Config) {
// provide with defaults for shared logging, since we need a valid destination address for BindEnv.
if cfg.Log == nil && cfg.Commons != nil && cfg.Commons.Log != nil {
cfg.Log = &config.Log{
Level: cfg.Commons.Log.Level,
Pretty: cfg.Commons.Log.Pretty,
Color: cfg.Commons.Log.Color,
File: cfg.Commons.Log.File,
}
} else if cfg.Log == nil {
cfg.Log = &config.Log{}
}
}
// Sanitize does nothing atm
func Sanitize(cfg *config.Config) {
}
@@ -0,0 +1,9 @@
package config
// Log defines the available log configuration.
type Log struct {
Level string `mapstructure:"level" env:"OCIS_LOG_LEVEL;POSTPROCESSING_LOG_LEVEL" desc:"The log level. Valid values are: \"panic\", \"fatal\", \"error\", \"warn\", \"info\", \"debug\", \"trace\"."`
Pretty bool `mapstructure:"pretty" env:"OCIS_LOG_PRETTY;POSTPROCESSING_LOG_PRETTY" desc:"Activates pretty log output."`
Color bool `mapstructure:"color" env:"OCIS_LOG_COLOR;POSTPROCESSING_LOG_COLOR" desc:"Activates colorized log output."`
File string `mapstructure:"file" env:"OCIS_LOG_FILE;POSTPROCESSING_LOG_FILE" desc:"The path to the log file. Activates logging to this file if set."`
}
@@ -0,0 +1,37 @@
package parser
import (
"errors"
ociscfg "github.com/owncloud/ocis/v2/ocis-pkg/config"
"github.com/owncloud/ocis/v2/services/postprocessing/pkg/config"
"github.com/owncloud/ocis/v2/services/postprocessing/pkg/config/defaults"
"github.com/owncloud/ocis/v2/ocis-pkg/config/envdecode"
)
// ParseConfig loads configuration from known paths.
func ParseConfig(cfg *config.Config) error {
_, err := ociscfg.BindSourcesToStructs(cfg.Service.Name, cfg)
if err != nil {
return err
}
defaults.EnsureDefaults(cfg)
// load all env variables relevant to the config in the current context.
if err := envdecode.Decode(cfg); err != nil {
// no environment variable set for this config is an expected "error"
if !errors.Is(err, envdecode.ErrNoTargetFieldsAreSet) {
return err
}
}
defaults.Sanitize(cfg)
return Validate(cfg)
}
func Validate(cfg *config.Config) error {
return nil
}
@@ -0,0 +1,6 @@
package config
// Service defines the available service configuration.
type Service struct {
Name string `yaml:"-"`
}
@@ -0,0 +1,17 @@
package logging
import (
"github.com/owncloud/ocis/v2/ocis-pkg/log"
"github.com/owncloud/ocis/v2/services/postprocessing/pkg/config"
)
// LoggerFromConfig initializes a service-specific logger instance.
func Configure(name string, cfg *config.Log) log.Logger {
return log.NewLogger(
log.Name(name),
log.Level(cfg.Level),
log.Pretty(cfg.Pretty),
log.Color(cfg.Color),
log.File(cfg.File),
)
}
@@ -0,0 +1,118 @@
package postprocessing
import (
"time"
user "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"github.com/cs3org/reva/v2/pkg/events"
"github.com/owncloud/ocis/v2/services/postprocessing/pkg/config"
)
// Postprocessing handles postprocessing of a file
type Postprocessing struct {
id string
url string
u *user.User
m map[events.Postprocessingstep]interface{}
filename string
filesize uint64
resourceID *provider.ResourceId
c config.Postprocessing
steps []events.Postprocessingstep
}
// New returns a new postprocessing instance
func New(uploadID string, uploadURL string, user *user.User, filename string, filesize uint64, resourceID *provider.ResourceId, c config.Postprocessing) *Postprocessing {
return &Postprocessing{
id: uploadID,
url: uploadURL,
u: user,
m: make(map[events.Postprocessingstep]interface{}),
c: c,
filename: filename,
filesize: filesize,
resourceID: resourceID,
steps: getSteps(c),
}
}
// Init is the first step of the postprocessing
func (pp *Postprocessing) Init(ev events.BytesReceived) interface{} {
pp.m["init"] = ev
if len(pp.steps) == 0 {
return pp.finished(events.PPOutcomeContinue)
}
return pp.nextStep(pp.steps[0])
}
// Virusscan is the virusscanning step of the postprocessing
func (pp *Postprocessing) Virusscan(ev events.VirusscanFinished) interface{} {
pp.m[events.PPStepAntivirus] = ev
switch ev.Outcome {
case events.PPOutcomeContinue:
return pp.next(events.PPStepAntivirus)
default:
return pp.finished(ev.Outcome)
}
}
// Delay will sleep the configured time then continue
func (pp *Postprocessing) Delay(ev events.StartPostprocessingStep) interface{} {
pp.m[events.PPStepDelay] = ev
time.Sleep(pp.c.Delayprocessing)
return pp.next(events.PPStepDelay)
}
func (pp *Postprocessing) next(current events.Postprocessingstep) interface{} {
l := len(pp.steps)
for i, s := range pp.steps {
if s == current && i+1 < l {
return pp.nextStep(pp.steps[i+1])
}
}
return pp.finished(events.PPOutcomeContinue)
}
func (pp *Postprocessing) nextStep(next events.Postprocessingstep) events.StartPostprocessingStep {
return events.StartPostprocessingStep{
UploadID: pp.id,
URL: pp.url,
ExecutingUser: pp.u,
Filename: pp.filename,
Filesize: pp.filesize,
ResourceID: pp.resourceID,
StepToStart: next,
}
}
func (pp *Postprocessing) finished(outcome events.PostprocessingOutcome) events.PostprocessingFinished {
return events.PostprocessingFinished{
UploadID: pp.id,
Result: pp.m,
ExecutingUser: pp.u,
Filename: pp.filename,
Outcome: outcome,
}
}
func getSteps(c config.Postprocessing) []events.Postprocessingstep {
// NOTE: first version only contains very basic configuration options
// But we aim for a system where postprocessing steps and their order can be configured per space
// ideally by the spaceadmin itself
// We need to iterate over configuring PP service when we see fit
var steps []events.Postprocessingstep
if c.Delayprocessing != 0 {
steps = append(steps, events.PPStepDelay)
}
if c.Virusscan {
steps = append(steps, events.PPStepAntivirus)
}
return steps
}
@@ -0,0 +1,75 @@
package service
import (
"github.com/cs3org/reva/v2/pkg/events"
"github.com/owncloud/ocis/v2/ocis-pkg/log"
"github.com/owncloud/ocis/v2/services/postprocessing/pkg/config"
"github.com/owncloud/ocis/v2/services/postprocessing/pkg/postprocessing"
)
// PostprocessingService is an instance of the service handling postprocessing of files
type PostprocessingService struct {
log log.Logger
events <-chan interface{}
pub events.Publisher
c config.Postprocessing
}
// NewPostprocessingService returns a new instance of a postprocessing service
func NewPostprocessingService(stream events.Stream, logger log.Logger, c config.Postprocessing) (*PostprocessingService, error) {
evs, err := events.Consume(stream, "postprocessing",
events.BytesReceived{},
events.StartPostprocessingStep{},
events.VirusscanFinished{},
events.UploadReady{},
)
if err != nil {
return nil, err
}
return &PostprocessingService{
log: logger,
events: evs,
pub: stream,
c: c,
}, nil
}
// Run to fulfil Runner interface
func (pps *PostprocessingService) Run() error {
current := make(map[string]*postprocessing.Postprocessing)
for e := range pps.events {
var next interface{}
switch ev := e.(type) {
case events.BytesReceived:
pp := postprocessing.New(ev.UploadID, ev.URL, ev.ExecutingUser, ev.Filename, ev.Filesize, ev.ResourceID, pps.c)
current[ev.UploadID] = pp
next = pp.Init(ev)
case events.VirusscanFinished:
pp := current[ev.UploadID]
if pp == nil {
// no current upload - this was an on demand scan
continue
}
next = pp.Virusscan(ev)
case events.StartPostprocessingStep:
if ev.StepToStart != events.PPStepDelay {
continue
}
pp := current[ev.UploadID]
next = pp.Delay(ev)
case events.UploadReady:
// the storage provider thinks the upload is done - so no need to keep it any more
delete(current, ev.UploadID)
}
if next != nil {
if err := events.Publish(pps.pub, next); err != nil {
pps.log.Error().Err(err).Msg("unable to publish event")
return err // we can't publish -> we are screwed
}
}
}
return nil
}
@@ -24,6 +24,8 @@ type Config struct {
Driver string `yaml:"driver" env:"STORAGE_USERS_DRIVER" desc:"The storage driver which should be used by the service"`
Drivers Drivers `yaml:"drivers"`
DataServerURL string `yaml:"data_server_url" env:"STORAGE_USERS_DATA_SERVER_URL" desc:"URL of the data server, needs to be reachable by the data gateway provided by the frontend service or the user if directly exposed."`
DataGatewayURL string `yaml:"data_gateway_url" env:"STORAGE_USERS_DATA_GATEWAY_URL" desc:"URL of the data gateway server"`
TransferExpires int64 `yaml:"transfer_expires" env:"STORAGE_USERS_TRANSFER_EXPIRES" desc:"the time after which the token for upload postprocessing expires"`
Events Events `yaml:"events"`
Cache Cache `yaml:"cache"`
MountID string `yaml:"mount_id" env:"STORAGE_USERS_MOUNT_ID" desc:"Mount ID of this storage."`
@@ -98,6 +100,7 @@ type OCISDriver struct {
ShareFolder string `yaml:"share_folder" env:"STORAGE_USERS_OCIS_SHARE_FOLDER" desc:"Name of the folder jailing all shares."`
MaxAcquireLockCycles int `yaml:"max_acquire_lock_cycles" env:"STORAGE_USERS_OCIS_MAX_ACQUIRE_LOCK_CYCLES" desc:"When trying to lock files, ocis will try this amount of times to acquire the lock before failing. After each try it will wait for an increasing amount of time. Values of 0 or below will be ignored and the default value of 20 will be used."`
LockCycleDurationFactor int `yaml:"lock_cycle_duration_factor" env:"STORAGE_USERS_OCIS_LOCK_CYCLE_DURATION_FACTOR" desc:"When trying to lock files, ocis will multiply the cycle with this factor and use it as a millisecond timeout. Values of 0 or below will be ignored and the default value of 30 will be used."`
AsyncUploads bool `yaml:"async_uploads" env:"STORAGE_USERS_OCIS_ASYNC_UPLOADS" desc:"Enable asynchronous file uploads."`
}
type S3NGDriver struct {
@@ -143,6 +146,7 @@ type Events struct {
TLSInsecure bool `yaml:"tls_insecure" env:"OCIS_INSECURE;STORAGE_USERS_EVENTS_TLS_INSECURE" desc:"Whether to verify the server TLS certificates."`
TLSRootCaCertPath string `yaml:"tls_root_ca_cert_path" env:"STORAGE_USERS_EVENTS_TLS_ROOT_CA_CERT" desc:"The root CA certificate used to validate the server's TLS certificate. If provided STORAGE_USERS_EVENTS_TLS_INSECURE will be seen as false."`
EnableTLS bool `yaml:"enable_tls" env:"OCIS_EVENTS_ENABLE_TLS;STORAGE_USERS_EVENTS_ENABLE_TLS" desc:"Enable TLS for the connection to the events broker. The events broker is the ocis service which receives and delivers events between the services.."`
NumConsumers int `yaml:"num_consumers" env:"STORAGE_USERS_EVENTS_NUM_CONSUMERS" desc:"The amount of concurrent event consumers to start. Event consumers are used for post-processing files. Multiple consumers increase parallelisation, but will also increase CPU and memory demands. The setting has no effect when the STORAGE_USERS_OCIS_ASYNC_UPLOADS is set to false. The default and minimum value is 1."`
}
// Cache holds cache config
@@ -39,6 +39,8 @@ func DefaultConfig() *config.Config {
},
Reva: shared.DefaultRevaConfig(),
DataServerURL: "http://localhost:9158/data",
DataGatewayURL: "https://localhost:9200/data",
TransferExpires: 86400,
UploadExpiration: 24 * 60 * 60,
Driver: "ocis",
Drivers: config.Drivers{
@@ -98,6 +98,25 @@ func UserDrivers(cfg *config.Config) map[string]interface{} {
"permissionssvc_tls_mode": cfg.Commons.GRPCClientTLS.Mode,
"max_acquire_lock_cycles": cfg.Drivers.OCIS.MaxAcquireLockCycles,
"lock_cycle_duration_factor": cfg.Drivers.OCIS.LockCycleDurationFactor,
"asyncfileuploads": cfg.Drivers.OCIS.AsyncUploads,
"statcache": map[string]interface{}{
"cache_store": cfg.Cache.Store,
"cache_nodes": cfg.Cache.Nodes,
"cache_database": cfg.Cache.Database,
},
"events": map[string]interface{}{
"natsaddress": cfg.Events.Addr,
"natsclusterid": cfg.Events.ClusterID,
"tlsinsecure": cfg.Events.TLSInsecure,
"tlsrootcacertificate": cfg.Events.TLSRootCaCertPath,
"numconsumers": cfg.Events.NumConsumers,
},
"tokens": map[string]interface{}{
"transfer_shared_secret": cfg.Commons.TransferSecret,
"transfer_expires": cfg.TransferExpires,
"download_endpoint": cfg.DataServerURL,
"datagateway_endpoint": cfg.DataGatewayURL,
},
},
"s3": map[string]interface{}{
"enable_home": false,
@@ -125,6 +144,25 @@ func UserDrivers(cfg *config.Config) map[string]interface{} {
"s3.bucket": cfg.Drivers.S3NG.Bucket,
"max_acquire_lock_cycles": cfg.Drivers.S3NG.MaxAcquireLockCycles,
"lock_cycle_duration_factor": cfg.Drivers.S3NG.LockCycleDurationFactor,
"asyncfileuploads": cfg.Drivers.OCIS.AsyncUploads,
"statcache": map[string]interface{}{
"cache_store": cfg.Cache.Store,
"cache_nodes": cfg.Cache.Nodes,
"cache_database": cfg.Cache.Database,
},
"events": map[string]interface{}{
"natsaddress": cfg.Events.Addr,
"natsclusterid": cfg.Events.ClusterID,
"tlsinsecure": cfg.Events.TLSInsecure,
"tlsrootcacertificate": cfg.Events.TLSRootCaCertPath,
"numconsumers": cfg.Events.NumConsumers,
},
"tokens": map[string]interface{}{
"transfer_shared_secret": cfg.Commons.TransferSecret,
"transfer_expires": cfg.TransferExpires,
"download_endpoint": cfg.DataServerURL,
"datagateway_endpoint": cfg.DataGatewayURL,
},
},
}
}