add a store to postprocessing

Signed-off-by: jkoberg <jkoberg@owncloud.com>
This commit is contained in:
jkoberg
2023-05-11 13:05:49 +02:00
parent 7d8a245d37
commit a324ad48a0
6 changed files with 131 additions and 48 deletions

View File

@@ -8,6 +8,7 @@ import (
"os"
"github.com/cs3org/reva/v2/pkg/events/stream"
"github.com/cs3org/reva/v2/pkg/store"
"github.com/go-micro/plugins/v4/events/natsjs"
"github.com/oklog/run"
ociscrypto "github.com/owncloud/ocis/v2/ocis-pkg/crypto"
@@ -19,6 +20,7 @@ import (
"github.com/owncloud/ocis/v2/services/postprocessing/pkg/logging"
"github.com/owncloud/ocis/v2/services/postprocessing/pkg/service"
"github.com/urfave/cli/v2"
microstore "go-micro.dev/v4/store"
)
// Server is the entrypoint for the server command.
@@ -82,7 +84,16 @@ func Server(cfg *config.Config) *cli.Command {
return err
}
svc, err := service.NewPostprocessingService(bus, logger, cfg.Postprocessing)
st := store.Create(
store.Store(cfg.Store.Store),
store.TTL(cfg.Store.TTL),
store.Size(cfg.Store.Size),
microstore.Nodes(cfg.Store.Nodes...),
microstore.Database(cfg.Store.Database),
microstore.Table(cfg.Store.Table),
)
svc, err := service.NewPostprocessingService(bus, logger, st, cfg.Postprocessing)
if err != nil {
return err
}

View File

@@ -17,6 +17,7 @@ type Config struct {
Log *Log `yaml:"log"`
Debug Debug `yaml:"debug"`
Store Store `yaml:"store"`
Postprocessing Postprocessing `yaml:"postprocessing"`
Context context.Context `yaml:"-"`
@@ -55,3 +56,13 @@ type Tracing struct {
Endpoint string `yaml:"endpoint" env:"OCIS_TRACING_ENDPOINT;POSTPROCESSING_TRACING_ENDPOINT" desc:"The endpoint of the tracing agent."`
Collector string `yaml:"collector" env:"OCIS_TRACING_COLLECTOR;POSTPROCESSING_TRACING_COLLECTOR" desc:"The HTTP endpoint for sending spans directly to a collector, i.e. http://jaeger-collector:14268/api/traces. Only used if the tracing endpoint is unset."`
}
// Store configures the store to use
type Store struct {
Store string `yaml:"store" env:"OCIS_PERSISTENT_STORE;POSTPROCESSING_STORE" desc:"The type of the store. Supported values are: 'memory', 'ocmem', 'etcd', 'redis', 'redis-sentinel', 'nats-js', 'noop'. See the text description for details."`
Nodes []string `yaml:"nodes" env:"OCIS_PERSISTENT_STORE_NODES;POSTPROCESSING_STORE_NODES" desc:"A comma separated list of nodes to access the configured store. This has no effect when 'memory' or 'ocmem' stores are configured. Note that the behaviour how nodes are used is dependent on the library of the configured store."`
Database string `yaml:"database" env:"POSTPROCESSING_STORE_DATABASE" desc:"The database name the configured store should use."`
Table string `yaml:"table" env:"POSTPROCESSING_STORE_TABLE" desc:"The database table the store should use."`
TTL time.Duration `yaml:"ttl" env:"OCIS_PERSISTENT_STORE_TTL;POSTPROCESSING_STORE_TTL" desc:"Time to live for events in the store. The duration can be set as number followed by a unit identifier like s, m or h. Defaults to '336h' (2 weeks)."`
Size int `yaml:"size" env:"OCIS_PERSISTENT_STORE_SIZE;POSTPROCESSING_STORE_SIZE" desc:"The maximum quantity of items in the store. Only applies when store type 'ocmem' is configured. Defaults to 512."`
}

View File

@@ -10,47 +10,41 @@ import (
// Postprocessing handles postprocessing of a file
type Postprocessing struct {
id string
url string
u *user.User
m map[events.Postprocessingstep]interface{}
filename string
filesize uint64
resourceID *provider.ResourceId
steps []events.Postprocessingstep
delay time.Duration
ID string
URL string
User *user.User
Filename string
Filesize uint64
ResourceID *provider.ResourceId
Steps []events.Postprocessingstep
PPDelay time.Duration
}
// New returns a new postprocessing instance
func New(uploadID string, uploadURL string, user *user.User, filename string, filesize uint64, resourceID *provider.ResourceId, steps []events.Postprocessingstep, delay time.Duration) *Postprocessing {
return &Postprocessing{
id: uploadID,
url: uploadURL,
u: user,
m: make(map[events.Postprocessingstep]interface{}),
filename: filename,
filesize: filesize,
resourceID: resourceID,
steps: steps,
delay: delay,
ID: uploadID,
URL: uploadURL,
User: user,
Filename: filename,
Filesize: filesize,
ResourceID: resourceID,
Steps: steps,
PPDelay: delay,
}
}
// Init is the first step of the postprocessing
func (pp *Postprocessing) Init(ev events.BytesReceived) interface{} {
pp.m["init"] = ev
if len(pp.steps) == 0 {
if len(pp.Steps) == 0 {
return pp.finished(events.PPOutcomeContinue)
}
return pp.nextStep(pp.steps[0])
return pp.nextStep(pp.Steps[0])
}
// NextStep returns the next postprocessing step
func (pp *Postprocessing) NextStep(ev events.PostprocessingStepFinished) interface{} {
pp.m[ev.FinishedStep] = ev
switch ev.Outcome {
case events.PPOutcomeContinue:
return pp.next(ev.FinishedStep)
@@ -62,16 +56,15 @@ func (pp *Postprocessing) NextStep(ev events.PostprocessingStepFinished) interfa
// Delay will sleep the configured time then continue
func (pp *Postprocessing) Delay(ev events.StartPostprocessingStep) interface{} {
pp.m[events.PPStepDelay] = ev
time.Sleep(pp.delay)
time.Sleep(pp.PPDelay)
return pp.next(events.PPStepDelay)
}
func (pp *Postprocessing) next(current events.Postprocessingstep) interface{} {
l := len(pp.steps)
for i, s := range pp.steps {
l := len(pp.Steps)
for i, s := range pp.Steps {
if s == current && i+1 < l {
return pp.nextStep(pp.steps[i+1])
return pp.nextStep(pp.Steps[i+1])
}
}
return pp.finished(events.PPOutcomeContinue)
@@ -79,22 +72,21 @@ func (pp *Postprocessing) next(current events.Postprocessingstep) interface{} {
func (pp *Postprocessing) nextStep(next events.Postprocessingstep) events.StartPostprocessingStep {
return events.StartPostprocessingStep{
UploadID: pp.id,
URL: pp.url,
ExecutingUser: pp.u,
Filename: pp.filename,
Filesize: pp.filesize,
ResourceID: pp.resourceID,
UploadID: pp.ID,
URL: pp.URL,
ExecutingUser: pp.User,
Filename: pp.Filename,
Filesize: pp.Filesize,
ResourceID: pp.ResourceID,
StepToStart: next,
}
}
func (pp *Postprocessing) finished(outcome events.PostprocessingOutcome) events.PostprocessingFinished {
return events.PostprocessingFinished{
UploadID: pp.id,
Result: pp.m,
ExecutingUser: pp.u,
Filename: pp.filename,
UploadID: pp.ID,
ExecutingUser: pp.User,
Filename: pp.Filename,
Outcome: outcome,
}
}

View File

@@ -1,10 +1,14 @@
package service
import (
"encoding/json"
"fmt"
"github.com/cs3org/reva/v2/pkg/events"
"github.com/owncloud/ocis/v2/ocis-pkg/log"
"github.com/owncloud/ocis/v2/services/postprocessing/pkg/config"
"github.com/owncloud/ocis/v2/services/postprocessing/pkg/postprocessing"
"go-micro.dev/v4/store"
)
// PostprocessingService is an instance of the service handling postprocessing of files
@@ -13,11 +17,12 @@ type PostprocessingService struct {
events <-chan events.Event
pub events.Publisher
steps []events.Postprocessingstep
store store.Store
c config.Postprocessing
}
// NewPostprocessingService returns a new instance of a postprocessing service
func NewPostprocessingService(stream events.Stream, logger log.Logger, c config.Postprocessing) (*PostprocessingService, error) {
func NewPostprocessingService(stream events.Stream, logger log.Logger, sto store.Store, c config.Postprocessing) (*PostprocessingService, error) {
evs, err := events.Consume(stream, "postprocessing",
events.BytesReceived{},
events.StartPostprocessingStep{},
@@ -33,36 +38,51 @@ func NewPostprocessingService(stream events.Stream, logger log.Logger, c config.
events: evs,
pub: stream,
steps: getSteps(c),
store: sto,
c: c,
}, nil
}
// Run to fulfil Runner interface
func (pps *PostprocessingService) Run() error {
current := make(map[string]*postprocessing.Postprocessing)
for e := range pps.events {
var next interface{}
switch ev := e.Event.(type) {
case events.BytesReceived:
pp := postprocessing.New(ev.UploadID, ev.URL, ev.ExecutingUser, ev.Filename, ev.Filesize, ev.ResourceID, pps.steps, pps.c.Delayprocessing)
current[ev.UploadID] = pp
if err := storePP(pps.store, pp); err != nil {
pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot store upload")
continue
}
next = pp.Init(ev)
case events.PostprocessingStepFinished:
pp := current[ev.UploadID]
if pp == nil {
if ev.UploadID == "" {
// no current upload - this was an on demand scan
continue
}
pp, err := getPP(pps.store, ev.UploadID)
if err != nil {
pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot get upload")
continue
}
next = pp.NextStep(ev)
case events.StartPostprocessingStep:
if ev.StepToStart != events.PPStepDelay {
continue
}
pp := current[ev.UploadID]
pp, err := getPP(pps.store, ev.UploadID)
if err != nil {
pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot get upload")
continue
}
next = pp.Delay(ev)
case events.UploadReady:
// the storage provider thinks the upload is done - so no need to keep it any more
delete(current, ev.UploadID)
if err := pps.store.Delete(ev.UploadID); err != nil {
pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot delete upload")
continue
}
}
if next != nil {
@@ -87,3 +107,29 @@ func getSteps(c config.Postprocessing) []events.Postprocessingstep {
return steps
}
func storePP(sto store.Store, pp *postprocessing.Postprocessing) error {
b, err := json.Marshal(pp)
if err != nil {
return err
}
return sto.Write(&store.Record{
Key: pp.ID,
Value: b,
})
}
func getPP(sto store.Store, uploadID string) (*postprocessing.Postprocessing, error) {
recs, err := sto.Read(uploadID)
if err != nil {
return nil, err
}
if len(recs) != 1 {
return nil, fmt.Errorf("expected only one result for '%s', got %d", uploadID, len(recs))
}
var pp postprocessing.Postprocessing
return &pp, json.Unmarshal(recs[0].Value, &pp)
}