diff --git a/services/postprocessing/pkg/command/server.go b/services/postprocessing/pkg/command/server.go index 35b0a55cc..1cbeafacb 100644 --- a/services/postprocessing/pkg/command/server.go +++ b/services/postprocessing/pkg/command/server.go @@ -6,7 +6,6 @@ import ( "os" "github.com/oklog/run" - "github.com/opencloud-eu/reva/v2/pkg/events/stream" "github.com/opencloud-eu/reva/v2/pkg/store" "github.com/urfave/cli/v2" microstore "go-micro.dev/v4/store" @@ -47,11 +46,6 @@ func Server(cfg *config.Config) *cli.Command { } { - bus, err := stream.NatsFromConfig(cfg.Service.Name, false, stream.NatsConfig(cfg.Postprocessing.Events)) - if err != nil { - return err - } - st := store.Create( store.Store(cfg.Store.Store), store.TTL(cfg.Store.TTL), @@ -61,7 +55,7 @@ func Server(cfg *config.Config) *cli.Command { store.Authentication(cfg.Store.AuthUsername, cfg.Store.AuthPassword), ) - svc, err := service.NewPostprocessingService(ctx, bus, logger, st, traceProvider, cfg.Postprocessing) + svc, err := service.NewPostprocessingService(ctx, logger, st, traceProvider, cfg) if err != nil { return err } diff --git a/services/postprocessing/pkg/service/service.go b/services/postprocessing/pkg/service/service.go index 9b8eb6a05..8782757ac 100644 --- a/services/postprocessing/pkg/service/service.go +++ b/services/postprocessing/pkg/service/service.go @@ -13,6 +13,8 @@ import ( "github.com/opencloud-eu/opencloud/services/postprocessing/pkg/postprocessing" ctxpkg "github.com/opencloud-eu/reva/v2/pkg/ctx" "github.com/opencloud-eu/reva/v2/pkg/events" + "github.com/opencloud-eu/reva/v2/pkg/events/raw" + "github.com/opencloud-eu/reva/v2/pkg/events/stream" "github.com/opencloud-eu/reva/v2/pkg/utils" "go-micro.dev/v4/store" "go.opentelemetry.io/otel/trace" @@ -22,7 +24,7 @@ import ( type PostprocessingService struct { ctx context.Context log log.Logger - events <-chan events.Event + events <-chan raw.Event pub events.Publisher steps []events.Postprocessingstep store store.Store @@ -40,14 +42,27 @@ var ( ) // NewPostprocessingService returns a new instance of a postprocessing service -func NewPostprocessingService(ctx context.Context, stream events.Stream, logger log.Logger, sto store.Store, tp trace.TracerProvider, c config.Postprocessing) (*PostprocessingService, error) { - evs, err := events.Consume(stream, "postprocessing", +func NewPostprocessingService(ctx context.Context, logger log.Logger, sto store.Store, tp trace.TracerProvider, cfg *config.Config) (*PostprocessingService, error) { + pub, err := stream.NatsFromConfig(cfg.Service.Name, false, stream.NatsConfig(cfg.Postprocessing.Events)) + if err != nil { + return nil, err + } + raw, err := raw.FromConfig(ctx, cfg.Service.Name, raw.Config{ + Endpoint: cfg.Postprocessing.Events.Endpoint, + Cluster: cfg.Postprocessing.Events.Cluster, + EnableTLS: cfg.Postprocessing.Events.EnableTLS, + TLSInsecure: cfg.Postprocessing.Events.TLSInsecure, + TLSRootCACertificate: cfg.Postprocessing.Events.TLSRootCACertificate, + AuthUsername: cfg.Postprocessing.Events.AuthUsername, + AuthPassword: cfg.Postprocessing.Events.AuthPassword, + }) + + evs, err := raw.Consume("postprocessing-pull", events.BytesReceived{}, events.StartPostprocessingStep{}, events.UploadReady{}, events.PostprocessingStepFinished{}, - events.ResumePostprocessing{}, - ) + events.ResumePostprocessing{}) if err != nil { return nil, err } @@ -56,10 +71,10 @@ func NewPostprocessingService(ctx context.Context, stream events.Stream, logger ctx: ctx, log: logger, events: evs, - pub: stream, - steps: getSteps(c), + pub: pub, + steps: getSteps(cfg.Postprocessing), store: sto, - c: c, + c: cfg.Postprocessing, tp: tp, }, nil } @@ -91,7 +106,7 @@ func (pps *PostprocessingService) Run() error { return nil } -func (pps *PostprocessingService) processEvent(e events.Event) error { +func (pps *PostprocessingService) processEvent(e raw.Event) error { var ( next interface{} pp *postprocessing.Postprocessing @@ -102,7 +117,16 @@ func (pps *PostprocessingService) processEvent(e events.Event) error { ctx, span := pps.tp.Tracer("postprocessing").Start(ctx, "processEvent") defer span.End() - switch ev := e.Event.(type) { + ackEvent := true + defer func() { + if ackEvent { + if err := e.Ack(); err != nil { + pps.log.Error().Err(err).Msg("unable to ack event") + } + } + }() + + switch ev := e.Event.Event.(type) { case events.BytesReceived: pp = &postprocessing.Postprocessing{ ID: ev.UploadID, @@ -189,6 +213,7 @@ func (pps *PostprocessingService) processEvent(e events.Event) error { ctx = ctxpkg.ContextSetInitiator(ctx, pp.InitiatorID) if err := storePP(pps.store, pp); err != nil { + ackEvent = false pps.log.Error().Str("uploadID", pp.ID).Err(err).Msg("cannot store upload") return fmt.Errorf("%w: cannot store upload", ErrEvent) }