mirror of
https://github.com/opencloud-eu/opencloud.git
synced 2025-12-31 01:10:20 -06:00
Only ack postprocessing events after persisting
This commit is contained in:
@@ -6,7 +6,6 @@ import (
|
||||
"os"
|
||||
|
||||
"github.com/oklog/run"
|
||||
"github.com/opencloud-eu/reva/v2/pkg/events/stream"
|
||||
"github.com/opencloud-eu/reva/v2/pkg/store"
|
||||
"github.com/urfave/cli/v2"
|
||||
microstore "go-micro.dev/v4/store"
|
||||
@@ -47,11 +46,6 @@ func Server(cfg *config.Config) *cli.Command {
|
||||
}
|
||||
|
||||
{
|
||||
bus, err := stream.NatsFromConfig(cfg.Service.Name, false, stream.NatsConfig(cfg.Postprocessing.Events))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
st := store.Create(
|
||||
store.Store(cfg.Store.Store),
|
||||
store.TTL(cfg.Store.TTL),
|
||||
@@ -61,7 +55,7 @@ func Server(cfg *config.Config) *cli.Command {
|
||||
store.Authentication(cfg.Store.AuthUsername, cfg.Store.AuthPassword),
|
||||
)
|
||||
|
||||
svc, err := service.NewPostprocessingService(ctx, bus, logger, st, traceProvider, cfg.Postprocessing)
|
||||
svc, err := service.NewPostprocessingService(ctx, logger, st, traceProvider, cfg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -13,6 +13,8 @@ import (
|
||||
"github.com/opencloud-eu/opencloud/services/postprocessing/pkg/postprocessing"
|
||||
ctxpkg "github.com/opencloud-eu/reva/v2/pkg/ctx"
|
||||
"github.com/opencloud-eu/reva/v2/pkg/events"
|
||||
"github.com/opencloud-eu/reva/v2/pkg/events/raw"
|
||||
"github.com/opencloud-eu/reva/v2/pkg/events/stream"
|
||||
"github.com/opencloud-eu/reva/v2/pkg/utils"
|
||||
"go-micro.dev/v4/store"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
@@ -22,7 +24,7 @@ import (
|
||||
type PostprocessingService struct {
|
||||
ctx context.Context
|
||||
log log.Logger
|
||||
events <-chan events.Event
|
||||
events <-chan raw.Event
|
||||
pub events.Publisher
|
||||
steps []events.Postprocessingstep
|
||||
store store.Store
|
||||
@@ -40,14 +42,27 @@ var (
|
||||
)
|
||||
|
||||
// NewPostprocessingService returns a new instance of a postprocessing service
|
||||
func NewPostprocessingService(ctx context.Context, stream events.Stream, logger log.Logger, sto store.Store, tp trace.TracerProvider, c config.Postprocessing) (*PostprocessingService, error) {
|
||||
evs, err := events.Consume(stream, "postprocessing",
|
||||
func NewPostprocessingService(ctx context.Context, logger log.Logger, sto store.Store, tp trace.TracerProvider, cfg *config.Config) (*PostprocessingService, error) {
|
||||
pub, err := stream.NatsFromConfig(cfg.Service.Name, false, stream.NatsConfig(cfg.Postprocessing.Events))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
raw, err := raw.FromConfig(ctx, cfg.Service.Name, raw.Config{
|
||||
Endpoint: cfg.Postprocessing.Events.Endpoint,
|
||||
Cluster: cfg.Postprocessing.Events.Cluster,
|
||||
EnableTLS: cfg.Postprocessing.Events.EnableTLS,
|
||||
TLSInsecure: cfg.Postprocessing.Events.TLSInsecure,
|
||||
TLSRootCACertificate: cfg.Postprocessing.Events.TLSRootCACertificate,
|
||||
AuthUsername: cfg.Postprocessing.Events.AuthUsername,
|
||||
AuthPassword: cfg.Postprocessing.Events.AuthPassword,
|
||||
})
|
||||
|
||||
evs, err := raw.Consume("postprocessing-pull",
|
||||
events.BytesReceived{},
|
||||
events.StartPostprocessingStep{},
|
||||
events.UploadReady{},
|
||||
events.PostprocessingStepFinished{},
|
||||
events.ResumePostprocessing{},
|
||||
)
|
||||
events.ResumePostprocessing{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -56,10 +71,10 @@ func NewPostprocessingService(ctx context.Context, stream events.Stream, logger
|
||||
ctx: ctx,
|
||||
log: logger,
|
||||
events: evs,
|
||||
pub: stream,
|
||||
steps: getSteps(c),
|
||||
pub: pub,
|
||||
steps: getSteps(cfg.Postprocessing),
|
||||
store: sto,
|
||||
c: c,
|
||||
c: cfg.Postprocessing,
|
||||
tp: tp,
|
||||
}, nil
|
||||
}
|
||||
@@ -91,7 +106,7 @@ func (pps *PostprocessingService) Run() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (pps *PostprocessingService) processEvent(e events.Event) error {
|
||||
func (pps *PostprocessingService) processEvent(e raw.Event) error {
|
||||
var (
|
||||
next interface{}
|
||||
pp *postprocessing.Postprocessing
|
||||
@@ -102,7 +117,16 @@ func (pps *PostprocessingService) processEvent(e events.Event) error {
|
||||
ctx, span := pps.tp.Tracer("postprocessing").Start(ctx, "processEvent")
|
||||
defer span.End()
|
||||
|
||||
switch ev := e.Event.(type) {
|
||||
ackEvent := true
|
||||
defer func() {
|
||||
if ackEvent {
|
||||
if err := e.Ack(); err != nil {
|
||||
pps.log.Error().Err(err).Msg("unable to ack event")
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
switch ev := e.Event.Event.(type) {
|
||||
case events.BytesReceived:
|
||||
pp = &postprocessing.Postprocessing{
|
||||
ID: ev.UploadID,
|
||||
@@ -189,6 +213,7 @@ func (pps *PostprocessingService) processEvent(e events.Event) error {
|
||||
ctx = ctxpkg.ContextSetInitiator(ctx, pp.InitiatorID)
|
||||
|
||||
if err := storePP(pps.store, pp); err != nil {
|
||||
ackEvent = false
|
||||
pps.log.Error().Str("uploadID", pp.ID).Err(err).Msg("cannot store upload")
|
||||
return fmt.Errorf("%w: cannot store upload", ErrEvent)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user