Files
opencloud/services/postprocessing/pkg/service/service.go
Daniël Franke 97034f4aaa Add tracing to postprocessing service (#7094)
This adds tracing to the postprocessing service.
2023-08-23 15:52:26 +02:00

191 lines
5.5 KiB
Go

package service
import (
"context"
"encoding/json"
"errors"
"fmt"
"github.com/cs3org/reva/v2/pkg/events"
"github.com/owncloud/ocis/v2/ocis-pkg/log"
"github.com/owncloud/ocis/v2/services/postprocessing/pkg/config"
"github.com/owncloud/ocis/v2/services/postprocessing/pkg/postprocessing"
"go-micro.dev/v4/store"
"go.opentelemetry.io/otel/trace"
)
// PostprocessingService is an instance of the service handling postprocessing of files
type PostprocessingService struct {
ctx context.Context
log log.Logger
events <-chan events.Event
pub events.Publisher
steps []events.Postprocessingstep
store store.Store
c config.Postprocessing
tp trace.TracerProvider
}
var (
// errFatal is returned when a fatal error occurs and we want to exit.
errFatal = errors.New("fatal error")
// ErrEvent is returned when something went wrong with a specific event.
errEvent = errors.New("event error")
)
// NewPostprocessingService returns a new instance of a postprocessing service
func NewPostprocessingService(ctx context.Context, stream events.Stream, logger log.Logger, sto store.Store, tp trace.TracerProvider, c config.Postprocessing) (*PostprocessingService, error) {
evs, err := events.Consume(stream, "postprocessing",
events.BytesReceived{},
events.StartPostprocessingStep{},
events.UploadReady{},
events.PostprocessingStepFinished{},
events.ResumePostprocessing{},
)
if err != nil {
return nil, err
}
return &PostprocessingService{
ctx: ctx,
log: logger,
events: evs,
pub: stream,
steps: getSteps(c),
store: sto,
c: c,
tp: tp,
}, nil
}
// Run to fulfil Runner interface
func (pps *PostprocessingService) Run() error {
for e := range pps.events {
err := pps.processEvent(e)
if err != nil {
switch {
case errors.Is(err, errFatal):
return err
case errors.Is(err, errEvent):
continue
default:
pps.log.Fatal().Err(err).Msg("unknown error - exiting")
}
}
}
return nil
}
func (pps *PostprocessingService) processEvent(e events.Event) error {
var (
next interface{}
pp *postprocessing.Postprocessing
err error
)
ctx := e.GetTraceContext(pps.ctx)
ctx, span := pps.tp.Tracer("postprocessing").Start(ctx, "processEvent")
defer span.End()
switch ev := e.Event.(type) {
case events.BytesReceived:
pp = postprocessing.New(ev.UploadID, ev.URL, ev.ExecutingUser, ev.Filename, ev.Filesize, ev.ResourceID, pps.steps, pps.c.Delayprocessing)
next = pp.Init(ev)
case events.PostprocessingStepFinished:
if ev.UploadID == "" {
// no current upload - this was an on demand scan
return nil
}
pp, err = getPP(pps.store, ev.UploadID)
if err != nil {
pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot get upload")
return fmt.Errorf("%w: cannot get upload", errEvent)
}
next = pp.NextStep(ev)
case events.StartPostprocessingStep:
if ev.StepToStart != events.PPStepDelay {
return nil
}
pp, err = getPP(pps.store, ev.UploadID)
if err != nil {
pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot get upload")
return fmt.Errorf("%w: cannot get upload", errEvent)
}
next = pp.Delay(ev)
case events.UploadReady:
// the storage provider thinks the upload is done - so no need to keep it any more
if err := pps.store.Delete(ev.UploadID); err != nil {
pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot delete upload")
return fmt.Errorf("%w: cannot delete upload", errEvent)
}
case events.ResumePostprocessing:
pp, err = getPP(pps.store, ev.UploadID)
if err != nil {
if err == store.ErrNotFound {
if err := events.Publish(ctx, pps.pub, events.RestartPostprocessing{
UploadID: ev.UploadID,
Timestamp: ev.Timestamp,
}); err != nil {
pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot publish RestartPostprocessing event")
}
return fmt.Errorf("%w: cannot publish RestartPostprocessing event", errEvent)
}
pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot get upload")
return fmt.Errorf("%w: cannot get upload", errEvent)
}
next = pp.CurrentStep()
}
if pp != nil {
if err := storePP(pps.store, pp); err != nil {
pps.log.Error().Str("uploadID", pp.ID).Err(err).Msg("cannot store upload")
return fmt.Errorf("%w: cannot store upload", errEvent)
}
}
if next != nil {
if err := events.Publish(ctx, pps.pub, next); err != nil {
pps.log.Error().Err(err).Msg("unable to publish event")
return fmt.Errorf("%w: unable to publish event", errFatal) // we can't publish -> we are screwed
}
}
return nil
}
func getSteps(c config.Postprocessing) []events.Postprocessingstep {
// NOTE: improved version only allows configuring order of postprocessing steps
// But we aim for a system where postprocessing steps can be configured per space, ideally by the spaceadmin itself
// We need to iterate over configuring PP service when we see fit
var steps []events.Postprocessingstep
for _, s := range c.Steps {
steps = append(steps, events.Postprocessingstep(s))
}
return steps
}
func storePP(sto store.Store, pp *postprocessing.Postprocessing) error {
b, err := json.Marshal(pp)
if err != nil {
return err
}
return sto.Write(&store.Record{
Key: pp.ID,
Value: b,
})
}
func getPP(sto store.Store, uploadID string) (*postprocessing.Postprocessing, error) {
recs, err := sto.Read(uploadID)
if err != nil {
return nil, err
}
if len(recs) != 1 {
return nil, fmt.Errorf("expected only one result for '%s', got %d", uploadID, len(recs))
}
var pp postprocessing.Postprocessing
return &pp, json.Unmarshal(recs[0].Value, &pp)
}