Merge pull request #8782 from kobergj/FixPostprocessingRestart

Repair Restart Postprocessing Logic
This commit is contained in:
kobergj
2024-04-09 08:17:59 +02:00
committed by GitHub
6 changed files with 83 additions and 21 deletions

View File

@@ -0,0 +1,8 @@
Bugfix: Fix restarting of postprocessing
When an upload is not found, the logic to restart postprocessing was bunked. Additionally we extended the upload sessions
command to be able to restart the uploads without using a second command.
NOTE: This also includes a breaking fix for the deprecated `ocis storage-users uploads list` command
https://github.com/owncloud/ocis/pull/8782

View File

@@ -81,12 +81,17 @@ See the [cs3 org](https://github.com/cs3org/reva/blob/edge/pkg/events/postproces
If postprocessing fails in one step due to an unforseen error, current uploads will not be retried automatically. A system admin can instead run a CLI command to retry the failed upload which is a two step process:
- First find the upload ID of the failed upload.
- First list ongoing upload sessions
```bash
ocis storage-users uploads list
ocis storage-users uploads sessions
```
- Then use the restart command to resume postprocessing of the ID selected.
- If you want to restart all uploads just rerun the command with the `--restart` flag
```bash
ocis storage-users uploads sessions --restart
```
- If you want to restart only one upload use the postprocessing restart command
```bash
ocis postprocessing restart -u <uploadID>
```

View File

@@ -35,9 +35,10 @@ func DefaultConfig() *config.Config {
MaxRetries: 14,
},
Store: config.Store{
Store: "memory",
Store: "nats-js-kv",
Nodes: []string{"127.0.0.1:9233"},
Database: "postprocessing",
Table: "postprocessing",
Table: "",
},
}
}

View File

@@ -40,7 +40,7 @@ func New(config config.Postprocessing) *Postprocessing {
}
// Init is the first step of the postprocessing
func (pp *Postprocessing) Init(ev events.BytesReceived) interface{} {
func (pp *Postprocessing) Init(_ events.BytesReceived) interface{} {
if len(pp.Steps) == 0 {
return pp.finished(events.PPOutcomeContinue)
}

View File

@@ -30,10 +30,12 @@ type PostprocessingService struct {
}
var (
// errFatal is returned when a fatal error occurs and we want to exit.
errFatal = errors.New("fatal error")
// ErrFatal is returned when a fatal error occurs and we want to exit.
ErrFatal = errors.New("fatal error")
// ErrEvent is returned when something went wrong with a specific event.
errEvent = errors.New("event error")
ErrEvent = errors.New("event error")
// ErrNotFound is returned when a postprocessing is not found in the store.
ErrNotFound = errors.New("postprocessing not found")
)
// NewPostprocessingService returns a new instance of a postprocessing service
@@ -67,9 +69,9 @@ func (pps *PostprocessingService) Run() error {
err := pps.processEvent(e)
if err != nil {
switch {
case errors.Is(err, errFatal):
case errors.Is(err, ErrFatal):
return err
case errors.Is(err, errEvent):
case errors.Is(err, ErrEvent):
continue
default:
pps.log.Fatal().Err(err).Msg("unknown error - exiting")
@@ -111,7 +113,7 @@ func (pps *PostprocessingService) processEvent(e events.Event) error {
pp, err = pps.getPP(pps.store, ev.UploadID)
if err != nil {
pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot get upload")
return fmt.Errorf("%w: cannot get upload", errEvent)
return fmt.Errorf("%w: cannot get upload", ErrEvent)
}
next = pp.NextStep(ev)
@@ -143,7 +145,7 @@ func (pps *PostprocessingService) processEvent(e events.Event) error {
pp, err = pps.getPP(pps.store, ev.UploadID)
if err != nil {
pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot get upload")
return fmt.Errorf("%w: cannot get upload", errEvent)
return fmt.Errorf("%w: cannot get upload", ErrEvent)
}
next = pp.Delay()
case events.UploadReady:
@@ -155,7 +157,7 @@ func (pps *PostprocessingService) processEvent(e events.Event) error {
// the storage provider thinks the upload is done - so no need to keep it any more
if err := pps.store.Delete(ev.UploadID); err != nil {
pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot delete upload")
return fmt.Errorf("%w: cannot delete upload", errEvent)
return fmt.Errorf("%w: cannot delete upload", ErrEvent)
}
case events.ResumePostprocessing:
return pps.handleResumePPEvent(ctx, ev)
@@ -166,14 +168,14 @@ func (pps *PostprocessingService) processEvent(e events.Event) error {
if err := storePP(pps.store, pp); err != nil {
pps.log.Error().Str("uploadID", pp.ID).Err(err).Msg("cannot store upload")
return fmt.Errorf("%w: cannot store upload", errEvent)
return fmt.Errorf("%w: cannot store upload", ErrEvent)
}
}
if next != nil {
if err := events.Publish(ctx, pps.pub, next); err != nil {
pps.log.Error().Err(err).Msg("unable to publish event")
return fmt.Errorf("%w: unable to publish event", errFatal) // we can't publish -> we are screwed
return fmt.Errorf("%w: unable to publish event", ErrFatal) // we can't publish -> we are screwed
}
}
return nil
@@ -182,10 +184,17 @@ func (pps *PostprocessingService) processEvent(e events.Event) error {
func (pps *PostprocessingService) getPP(sto store.Store, uploadID string) (*postprocessing.Postprocessing, error) {
recs, err := sto.Read(uploadID)
if err != nil {
if err == store.ErrNotFound {
return nil, ErrNotFound
}
return nil, err
}
if len(recs) != 1 {
if len(recs) == 0 {
return nil, ErrNotFound
}
if len(recs) > 1 {
return nil, fmt.Errorf("expected only one result for '%s', got %d", uploadID, len(recs))
}
@@ -231,7 +240,7 @@ func (pps *PostprocessingService) handleResumePPEvent(ctx context.Context, ev ev
for _, id := range ids {
if err := pps.resumePP(ctx, id); err != nil {
pps.log.Error().Str("uploadID", id).Err(err).Msg("cannot resume upload")
return fmt.Errorf("%w: cannot resume upload", errEvent)
return fmt.Errorf("cannot resume upload: %w", err)
}
}
return nil
@@ -240,7 +249,7 @@ func (pps *PostprocessingService) handleResumePPEvent(ctx context.Context, ev ev
func (pps *PostprocessingService) resumePP(ctx context.Context, uploadID string) error {
pp, err := pps.getPP(pps.store, uploadID)
if err != nil {
if err == store.ErrNotFound {
if err == ErrNotFound {
if err := events.Publish(ctx, pps.pub, events.RestartPostprocessing{
UploadID: uploadID,
Timestamp: utils.TSNow(),
@@ -249,7 +258,7 @@ func (pps *PostprocessingService) resumePP(ctx context.Context, uploadID string)
}
return nil
}
return fmt.Errorf("%w: cannot get upload", errEvent)
return fmt.Errorf("cannot get upload: %w", err)
}
return events.Publish(ctx, pps.pub, pp.CurrentStep())

View File

@@ -1,6 +1,7 @@
package command
import (
"context"
"encoding/json"
"fmt"
"os"
@@ -13,11 +14,14 @@ import (
"github.com/urfave/cli/v2"
userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
"github.com/cs3org/reva/v2/pkg/events"
"github.com/cs3org/reva/v2/pkg/storage"
"github.com/cs3org/reva/v2/pkg/storage/fs/registry"
"github.com/cs3org/reva/v2/pkg/utils"
"github.com/owncloud/ocis/v2/ocis-pkg/config/configlog"
"github.com/owncloud/ocis/v2/services/storage-users/pkg/config"
"github.com/owncloud/ocis/v2/services/storage-users/pkg/config/parser"
"github.com/owncloud/ocis/v2/services/storage-users/pkg/event"
"github.com/owncloud/ocis/v2/services/storage-users/pkg/revaconfig"
)
@@ -69,7 +73,7 @@ func ListUploads(cfg *config.Config) *cli.Command {
fmt.Println("Incomplete uploads:")
for _, u := range uploads {
ref := u.Reference()
fmt.Printf(" - %s (Space: %s, Name: %s, Size: %d/%d, Expires: %s, Processing: %t)\n", ref.GetResourceId().GetSpaceId(), u.ID(), u.Filename(), u.Offset(), u.Size(), u.Expires(), u.IsProcessing())
fmt.Printf(" - %s (Space: %s, Name: %s, Size: %d/%d, Expires: %s, Processing: %t)\n", u.ID(), ref.GetResourceId().GetSpaceId(), u.Filename(), u.Offset(), u.Size(), u.Expires(), u.IsProcessing())
}
return nil
},
@@ -101,6 +105,10 @@ func ListUploadSessions(cfg *config.Config) *cli.Command {
Name: "json",
Usage: "output as json",
},
&cli.BoolFlag{
Name: "restart",
Usage: "send restart event for all listed sessions",
},
},
Before: func(c *cli.Context) error {
return configlog.ReturnFatal(parser.ParseConfig(cfg))
@@ -124,6 +132,15 @@ func ListUploadSessions(cfg *config.Config) *cli.Command {
os.Exit(1)
}
var stream events.Stream
if c.Bool("restart") {
stream, err = event.NewStream(cfg)
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to create event stream: %v\n", err)
os.Exit(1)
}
}
var b strings.Builder
filter := storage.UploadSessionFilter{}
if c.IsSet("processing") {
@@ -200,6 +217,17 @@ func ListUploadSessions(cfg *config.Config) *cli.Command {
fmt.Println(err)
}
fmt.Println(string(j))
if c.Bool("restart") {
if err := events.Publish(context.Background(), stream, events.ResumePostprocessing{
UploadID: u.ID(),
Timestamp: utils.TSNow(),
}); err != nil {
fmt.Fprintf(os.Stderr, "Failed to send restart event for upload session '%s'\n", u.ID())
// if publishing fails there is no need to try publishing other events - they will fail too.
os.Exit(1)
}
}
}
} else {
@@ -223,6 +251,17 @@ func ListUploadSessions(cfg *config.Config) *cli.Command {
u.Expires().Format(time.RFC3339),
strconv.FormatBool(u.IsProcessing()),
})
if c.Bool("restart") {
if err := events.Publish(context.Background(), stream, events.ResumePostprocessing{
UploadID: u.ID(),
Timestamp: utils.TSNow(),
}); err != nil {
fmt.Fprintf(os.Stderr, "Failed to send restart event for upload session '%s'\n", u.ID())
// if publishing fails there is no need to try publishing other events - they will fail too.
os.Exit(1)
}
}
}
table.Render()
}