Files
opencloud/services/postprocessing/pkg/service/service.go
Florian Schade f38a9f4385 Introduce Policies-Service (#5716)
* add policies service
add policies proxy middleware
add policies event service
add policies grpc service
prepare ci and git environments (ci, make, readme, doc)

* add webfinger to the drone conf

* fix docs
remove not used virus scan postprocessing step

* relocate example rego file
implicitly enable and disable proxy and postprocessing policy checking by setting the query.
update configuration descriptions

* move policies
update readme

* use converter func to convert pp environment to actual environment
expose and test custom rego functions
add engine unit tests
add opa unit tests
update policies readme

Co-authored-by: Martin <github@diemattels.at>

* relocate sample policies to the deployments folder
change and document policies service port

* update index.md and small fix

* add health command
add version command
add debug server

---------

Co-authored-by: Martin <github@diemattels.at>
2023-03-14 16:08:22 +01:00

91 lines
2.7 KiB
Go

package service
import (
"github.com/cs3org/reva/v2/pkg/events"
"github.com/owncloud/ocis/v2/ocis-pkg/log"
"github.com/owncloud/ocis/v2/services/postprocessing/pkg/config"
"github.com/owncloud/ocis/v2/services/postprocessing/pkg/postprocessing"
)
// PostprocessingService is an instance of the service handling postprocessing of files
type PostprocessingService struct {
log log.Logger
events <-chan events.Event
pub events.Publisher
steps []events.Postprocessingstep
c config.Postprocessing
}
// NewPostprocessingService returns a new instance of a postprocessing service
func NewPostprocessingService(stream events.Stream, logger log.Logger, c config.Postprocessing) (*PostprocessingService, error) {
evs, err := events.Consume(stream, "postprocessing",
events.BytesReceived{},
events.StartPostprocessingStep{},
events.VirusscanFinished{},
events.UploadReady{},
events.PostprocessingStepFinished{},
)
if err != nil {
return nil, err
}
return &PostprocessingService{
log: logger,
events: evs,
pub: stream,
steps: getSteps(c),
c: c,
}, nil
}
// Run to fulfil Runner interface
func (pps *PostprocessingService) Run() error {
current := make(map[string]*postprocessing.Postprocessing)
for e := range pps.events {
var next interface{}
switch ev := e.Event.(type) {
case events.BytesReceived:
pp := postprocessing.New(ev.UploadID, ev.URL, ev.ExecutingUser, ev.Filename, ev.Filesize, ev.ResourceID, pps.steps, pps.c.Delayprocessing)
current[ev.UploadID] = pp
next = pp.Init(ev)
case events.PostprocessingStepFinished:
pp := current[ev.UploadID]
if pp == nil {
// no current upload - this was an on demand scan
continue
}
next = pp.NextStep(ev)
case events.StartPostprocessingStep:
if ev.StepToStart != events.PPStepDelay {
continue
}
pp := current[ev.UploadID]
next = pp.Delay(ev)
case events.UploadReady:
// the storage provider thinks the upload is done - so no need to keep it any more
delete(current, ev.UploadID)
}
if next != nil {
if err := events.Publish(pps.pub, next); err != nil {
pps.log.Error().Err(err).Msg("unable to publish event")
return err // we can't publish -> we are screwed
}
}
}
return nil
}
func getSteps(c config.Postprocessing) []events.Postprocessingstep {
// NOTE: improved version only allows configuring order of postprocessing steps
// But we aim for a system where postprocessing steps can be configured per space, ideally by the spaceadmin itself
// We need to iterate over configuring PP service when we see fit
var steps []events.Postprocessingstep
for _, s := range c.Steps {
steps = append(steps, events.Postprocessingstep(s))
}
return steps
}