diff --git a/services/antivirus/pkg/service/service.go b/services/antivirus/pkg/service/service.go index 09952d903..5b3fac146 100644 --- a/services/antivirus/pkg/service/service.go +++ b/services/antivirus/pkg/service/service.go @@ -121,6 +121,7 @@ func (av Antivirus) Run() error { av.l.Fatal().Err(err).Str("uploadid", ev.UploadID).Interface("resourceID", ev.ResourceID).Msg("cannot publish events - exiting") return err } + continue } av.l.Debug().Str("uploadid", ev.UploadID).Str("filename", ev.Filename).Msg("Starting virus scan.") diff --git a/services/postprocessing/pkg/command/postprocessing.go b/services/postprocessing/pkg/command/postprocessing.go new file mode 100644 index 000000000..b49d4b9ef --- /dev/null +++ b/services/postprocessing/pkg/command/postprocessing.go @@ -0,0 +1,58 @@ +package command + +import ( + "fmt" + "time" + + "github.com/cs3org/reva/v2/pkg/events" + "github.com/cs3org/reva/v2/pkg/utils" + "github.com/owncloud/ocis/v2/ocis-pkg/config/configlog" + "github.com/owncloud/ocis/v2/services/postprocessing/pkg/config" + "github.com/owncloud/ocis/v2/services/postprocessing/pkg/config/parser" + "github.com/urfave/cli/v2" +) + +// RestartPostprocessing cli command to restart postprocessing +func RestartPostprocessing(cfg *config.Config) *cli.Command { + return &cli.Command{ + Name: "restart", + Usage: "restart postprocessing for an uploadID", + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "upload-id", + Aliases: []string{"u"}, + Required: true, + Usage: "the uploadid to restart", + }, + }, + Before: func(c *cli.Context) error { + return configlog.ReturnFatal(parser.ParseConfig(cfg)) + }, + Action: func(c *cli.Context) error { + stream, err := getEventBus(cfg.Postprocessing.Events) + if err != nil { + return err + } + + ev := events.ResumePostprocessing{ + UploadID: c.String("upload-id"), + Timestamp: utils.TSNow(), + } + + if err := events.Publish(stream, ev); err != nil { + fmt.Println(err) + return err + } + + // go-micro nats implementation uses async publishing, + // therefore we need to manually wait. + // + // FIXME: upstream pr + // + // https://github.com/go-micro/plugins/blob/3e77393890683be4bacfb613bc5751867d584692/v4/events/natsjs/nats.go#L115 + time.Sleep(5 * time.Second) + + return nil + }, + } +} diff --git a/services/postprocessing/pkg/command/root.go b/services/postprocessing/pkg/command/root.go index 4718a58bd..f4a208632 100644 --- a/services/postprocessing/pkg/command/root.go +++ b/services/postprocessing/pkg/command/root.go @@ -15,6 +15,7 @@ func GetCommands(cfg *config.Config) cli.Commands { Server(cfg), // interaction with this service + RestartPostprocessing(cfg), // infos about this service Health(cfg), diff --git a/services/postprocessing/pkg/command/server.go b/services/postprocessing/pkg/command/server.go index 8824a630f..fb81fc714 100644 --- a/services/postprocessing/pkg/command/server.go +++ b/services/postprocessing/pkg/command/server.go @@ -7,6 +7,7 @@ import ( "fmt" "os" + "github.com/cs3org/reva/v2/pkg/events" "github.com/cs3org/reva/v2/pkg/events/stream" "github.com/cs3org/reva/v2/pkg/store" "github.com/go-micro/plugins/v4/events/natsjs" @@ -42,9 +43,6 @@ func Server(cfg *config.Config) *cli.Command { gr = run.Group{} logger = logging.Configure(cfg.Service.Name, cfg.Log) - evtsCfg = cfg.Postprocessing.Events - tlsConf *tls.Config - ctx, cancel = func() (context.Context, context.CancelFunc) { if cfg.Context == nil { return context.WithCancel(context.Background()) @@ -55,31 +53,7 @@ func Server(cfg *config.Config) *cli.Command { defer cancel() { - if evtsCfg.EnableTLS { - var rootCAPool *x509.CertPool - if evtsCfg.TLSRootCACertificate != "" { - rootCrtFile, err := os.Open(evtsCfg.TLSRootCACertificate) - if err != nil { - return err - } - - rootCAPool, err = ociscrypto.NewCertPoolFromPEM(rootCrtFile) - if err != nil { - return err - } - evtsCfg.TLSInsecure = false - } - - tlsConf = &tls.Config{ - RootCAs: rootCAPool, - } - } - - bus, err := stream.Nats( - natsjs.TLSConfig(tlsConf), - natsjs.Address(evtsCfg.Endpoint), - natsjs.ClusterID(evtsCfg.Cluster), - ) + bus, err := getEventBus(cfg.Postprocessing.Events) if err != nil { return err } @@ -136,3 +110,32 @@ func Server(cfg *config.Config) *cli.Command { }, } } + +func getEventBus(evtsCfg config.Events) (events.Stream, error) { + var tlsConf *tls.Config + if evtsCfg.EnableTLS { + var rootCAPool *x509.CertPool + if evtsCfg.TLSRootCACertificate != "" { + rootCrtFile, err := os.Open(evtsCfg.TLSRootCACertificate) + if err != nil { + return nil, err + } + + rootCAPool, err = ociscrypto.NewCertPoolFromPEM(rootCrtFile) + if err != nil { + return nil, err + } + evtsCfg.TLSInsecure = false + } + + tlsConf = &tls.Config{ + RootCAs: rootCAPool, + } + } + + return stream.Nats( + natsjs.TLSConfig(tlsConf), + natsjs.Address(evtsCfg.Endpoint), + natsjs.ClusterID(evtsCfg.Cluster), + ) +}