mirror of
https://github.com/opencloud-eu/opencloud.git
synced 2026-01-04 11:19:39 -06:00
159 lines
4.4 KiB
Go
159 lines
4.4 KiB
Go
package service
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
|
|
"github.com/cs3org/reva/v2/pkg/events"
|
|
"github.com/owncloud/ocis/v2/ocis-pkg/log"
|
|
"github.com/owncloud/ocis/v2/services/postprocessing/pkg/config"
|
|
"github.com/owncloud/ocis/v2/services/postprocessing/pkg/postprocessing"
|
|
"go-micro.dev/v4/store"
|
|
)
|
|
|
|
// PostprocessingService is an instance of the service handling postprocessing of files
|
|
type PostprocessingService struct {
|
|
log log.Logger
|
|
events <-chan events.Event
|
|
pub events.Publisher
|
|
steps []events.Postprocessingstep
|
|
store store.Store
|
|
c config.Postprocessing
|
|
}
|
|
|
|
// NewPostprocessingService returns a new instance of a postprocessing service
|
|
func NewPostprocessingService(stream events.Stream, logger log.Logger, sto store.Store, c config.Postprocessing) (*PostprocessingService, error) {
|
|
evs, err := events.Consume(stream, "postprocessing",
|
|
events.BytesReceived{},
|
|
events.StartPostprocessingStep{},
|
|
events.UploadReady{},
|
|
events.PostprocessingStepFinished{},
|
|
events.ResumePostprocessing{},
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &PostprocessingService{
|
|
log: logger,
|
|
events: evs,
|
|
pub: stream,
|
|
steps: getSteps(c),
|
|
store: sto,
|
|
c: c,
|
|
}, nil
|
|
}
|
|
|
|
// Run to fulfil Runner interface
|
|
func (pps *PostprocessingService) Run() error {
|
|
for e := range pps.events {
|
|
var (
|
|
next interface{}
|
|
pp *postprocessing.Postprocessing
|
|
err error
|
|
)
|
|
|
|
switch ev := e.Event.(type) {
|
|
case events.BytesReceived:
|
|
pp = postprocessing.New(ev.UploadID, ev.URL, ev.ExecutingUser, ev.Filename, ev.Filesize, ev.ResourceID, pps.steps, pps.c.Delayprocessing)
|
|
next = pp.Init(ev)
|
|
case events.PostprocessingStepFinished:
|
|
if ev.UploadID == "" {
|
|
// no current upload - this was an on demand scan
|
|
continue
|
|
}
|
|
pp, err = getPP(pps.store, ev.UploadID)
|
|
if err != nil {
|
|
pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot get upload")
|
|
continue
|
|
}
|
|
next = pp.NextStep(ev)
|
|
case events.StartPostprocessingStep:
|
|
if ev.StepToStart != events.PPStepDelay {
|
|
continue
|
|
}
|
|
pp, err = getPP(pps.store, ev.UploadID)
|
|
if err != nil {
|
|
pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot get upload")
|
|
continue
|
|
}
|
|
next = pp.Delay(ev)
|
|
case events.UploadReady:
|
|
// the storage provider thinks the upload is done - so no need to keep it any more
|
|
if err := pps.store.Delete(ev.UploadID); err != nil {
|
|
pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot delete upload")
|
|
continue
|
|
}
|
|
case events.ResumePostprocessing:
|
|
pp, err = getPP(pps.store, ev.UploadID)
|
|
if err != nil {
|
|
if err == store.ErrNotFound {
|
|
if err := events.Publish(pps.pub, events.RestartPostprocessing{
|
|
UploadID: ev.UploadID,
|
|
Timestamp: ev.Timestamp,
|
|
}); err != nil {
|
|
pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot publish RestartPostprocessing event")
|
|
}
|
|
continue
|
|
}
|
|
pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot get upload")
|
|
continue
|
|
}
|
|
next = pp.CurrentStep()
|
|
}
|
|
|
|
if pp != nil {
|
|
if err := storePP(pps.store, pp); err != nil {
|
|
pps.log.Error().Str("uploadID", pp.ID).Err(err).Msg("cannot store upload")
|
|
continue // TODO: should we really continue here?
|
|
}
|
|
}
|
|
if next != nil {
|
|
if err := events.Publish(pps.pub, next); err != nil {
|
|
pps.log.Error().Err(err).Msg("unable to publish event")
|
|
return err // we can't publish -> we are screwed
|
|
}
|
|
}
|
|
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func getSteps(c config.Postprocessing) []events.Postprocessingstep {
|
|
// NOTE: improved version only allows configuring order of postprocessing steps
|
|
// But we aim for a system where postprocessing steps can be configured per space, ideally by the spaceadmin itself
|
|
// We need to iterate over configuring PP service when we see fit
|
|
var steps []events.Postprocessingstep
|
|
for _, s := range c.Steps {
|
|
steps = append(steps, events.Postprocessingstep(s))
|
|
}
|
|
|
|
return steps
|
|
}
|
|
|
|
func storePP(sto store.Store, pp *postprocessing.Postprocessing) error {
|
|
b, err := json.Marshal(pp)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return sto.Write(&store.Record{
|
|
Key: pp.ID,
|
|
Value: b,
|
|
})
|
|
}
|
|
|
|
func getPP(sto store.Store, uploadID string) (*postprocessing.Postprocessing, error) {
|
|
recs, err := sto.Read(uploadID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if len(recs) != 1 {
|
|
return nil, fmt.Errorf("expected only one result for '%s', got %d", uploadID, len(recs))
|
|
}
|
|
|
|
var pp postprocessing.Postprocessing
|
|
return &pp, json.Unmarshal(recs[0].Value, &pp)
|
|
}
|