Non-durable SSE streams (#7986)

* make sse streams non-durable

Signed-off-by: jkoberg <jkoberg@owncloud.com>

* bump reva

Signed-off-by: jkoberg <jkoberg@owncloud.com>

---------

Signed-off-by: jkoberg <jkoberg@owncloud.com>
This commit is contained in:
kobergj
2023-12-18 11:47:25 +01:00
committed by GitHub
parent c7500fead5
commit 377a23ee7d
31 changed files with 173 additions and 60 deletions

View File

@@ -228,7 +228,7 @@ func publisherFromConfig(m map[string]interface{}) (events.Publisher, error) {
if ok {
tlsCert = val.(string)
}
return stream.NatsFromConfig(m["name"].(string), stream.NatsConfig{
return stream.NatsFromConfig(m["name"].(string), false, stream.NatsConfig{
Endpoint: m["address"].(string),
Cluster: m["clusterID"].(string),
EnableTLS: m["enable-tls"].(bool),

View File

@@ -1267,5 +1267,5 @@ func estreamFromConfig(c eventconfig) (events.Stream, error) {
return nil, nil
}
return stream.NatsFromConfig("storageprovider", stream.NatsConfig(c))
return stream.NatsFromConfig("storageprovider", false, stream.NatsConfig(c))
}

View File

@@ -80,7 +80,7 @@ func New(m map[string]interface{}, log *zerolog.Logger) (global.Service, error)
if conf.NatsAddress == "" || conf.NatsClusterID == "" {
log.Warn().Msg("missing or incomplete nats configuration. Events will not be published.")
} else {
s, err := stream.NatsFromConfig("dataprovider", stream.NatsConfig{
s, err := stream.NatsFromConfig("dataprovider", false, stream.NatsConfig{
Endpoint: conf.NatsAddress,
Cluster: conf.NatsClusterID,
EnableTLS: conf.NatsEnableTLS,

View File

@@ -25,7 +25,7 @@ type NatsConfig struct {
}
// NatsFromConfig returns a nats stream from the given config
func NatsFromConfig(connName string, cfg NatsConfig) (events.Stream, error) {
func NatsFromConfig(connName string, disableDurability bool, cfg NatsConfig) (events.Stream, error) {
var tlsConf *tls.Config
if cfg.EnableTLS {
var rootCAPool *x509.CertPool
@@ -48,13 +48,20 @@ func NatsFromConfig(connName string, cfg NatsConfig) (events.Stream, error) {
RootCAs: rootCAPool,
}
}
return Nats(
opts := []natsjs.Option{
natsjs.TLSConfig(tlsConf),
natsjs.Address(cfg.Endpoint),
natsjs.ClusterID(cfg.Cluster),
natsjs.SynchronousPublish(true),
natsjs.Name(connName),
)
}
if disableDurability {
opts = append(opts, natsjs.DisableDurableStreams())
}
return Nats(opts...)
}

View File

@@ -17,4 +17,44 @@ var (
Name: "reva_upload_active",
Help: "Number of active uploads",
})
// UploadProcessing is the number of uploads in processing
UploadProcessing = promauto.NewGauge(prometheus.GaugeOpts{
Name: "reva_upload_processing",
Help: "Number of uploads in processing",
})
// UploadSessionsInitiated is the number of upload sessions that have been initiated
UploadSessionsInitiated = promauto.NewCounter(prometheus.CounterOpts{
Name: "reva_upload_sessions_initiated",
Help: "Number of uploads sessions that were initiated",
})
// UploadSessionsBytesReceived is the number of upload sessions that have received all bytes
UploadSessionsBytesReceived = promauto.NewCounter(prometheus.CounterOpts{
Name: "reva_upload_sessions_bytes_received",
Help: "Number of uploads sessions that have received all bytes",
})
// UploadSessionsFinalized is the number of upload sessions that have received all bytes
UploadSessionsFinalized = promauto.NewCounter(prometheus.CounterOpts{
Name: "reva_upload_sessions_finalized",
Help: "Number of uploads sessions that have successfully completed",
})
// UploadSessionsAborted is the number of upload sessions that have been aborted
UploadSessionsAborted = promauto.NewCounter(prometheus.CounterOpts{
Name: "reva_upload_sessions_aborted",
Help: "Number of uploads sessions that have aborted by postprocessing",
})
// UploadSessionsDeleted is the number of upload sessions that have been deleted
UploadSessionsDeleted = promauto.NewCounter(prometheus.CounterOpts{
Name: "reva_upload_sessions_deleted",
Help: "Number of uploads sessions that have been deleted by postprocessing",
})
// UploadSessionsRestarted is the number of upload sessions that have been restarted
UploadSessionsRestarted = promauto.NewCounter(prometheus.CounterOpts{
Name: "reva_upload_sessions_restarted",
Help: "Number of uploads sessions that have been restarted by postprocessing",
})
// UploadSessionsScanned is the number of upload sessions that have been scanned by antivirus
UploadSessionsScanned = promauto.NewCounter(prometheus.CounterOpts{
Name: "reva_upload_sessions_scanned",
Help: "Number of uploads sessions that have been scanned by antivirus",
})
)

View File

@@ -175,7 +175,7 @@ func NewDefault(m map[string]interface{}) (share.Manager, error) {
var es events.Stream
if c.Events.Endpoint != "" {
es, err = stream.NatsFromConfig("jsoncs3-share-manager", stream.NatsConfig(c.Events))
es, err = stream.NatsFromConfig("jsoncs3-share-manager", false, stream.NatsConfig(c.Events))
if err != nil {
return nil, err
}

View File

@@ -42,6 +42,7 @@ import (
"github.com/cs3org/reva/v2/pkg/events"
"github.com/cs3org/reva/v2/pkg/logger"
"github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool"
"github.com/cs3org/reva/v2/pkg/rhttp/datatx/metrics"
"github.com/cs3org/reva/v2/pkg/rhttp/datatx/utils/download"
"github.com/cs3org/reva/v2/pkg/storage"
"github.com/cs3org/reva/v2/pkg/storage/cache"
@@ -268,14 +269,18 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) {
case events.PPOutcomeAbort:
failed = true
keepUpload = true
metrics.UploadSessionsAborted.Inc()
case events.PPOutcomeContinue:
if err := up.Finalize(); err != nil {
log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("could not finalize upload")
keepUpload = true // should we keep the upload when assembling failed?
failed = true
} else {
metrics.UploadSessionsFinalized.Inc()
}
case events.PPOutcomeDelete:
failed = true
metrics.UploadSessionsDeleted.Inc()
}
getParent := func() *node.Node {
@@ -344,6 +349,9 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) {
log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("could not create url")
continue
}
metrics.UploadSessionsRestarted.Inc()
// restart postprocessing
if err := events.Publish(ctx, fs.stream, events.BytesReceived{
UploadID: up.Info.ID,
@@ -471,6 +479,8 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) {
continue
}
metrics.UploadSessionsScanned.Inc()
// remove cache entry in gateway
fs.cache.RemoveStatContext(ctx, ev.ExecutingUser.GetId(), &provider.ResourceId{SpaceId: n.SpaceID, OpaqueId: n.ID})
default:

View File

@@ -41,6 +41,7 @@ import (
ctxpkg "github.com/cs3org/reva/v2/pkg/ctx"
"github.com/cs3org/reva/v2/pkg/errtypes"
"github.com/cs3org/reva/v2/pkg/mime"
"github.com/cs3org/reva/v2/pkg/rhttp/datatx/metrics"
"github.com/cs3org/reva/v2/pkg/storage/utils/ace"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata/prefixes"
@@ -1220,6 +1221,9 @@ func (n *Node) FindStorageSpaceRoot(ctx context.Context) error {
// UnmarkProcessing removes the processing flag from the node
func (n *Node) UnmarkProcessing(ctx context.Context, uploadID string) error {
// we currently have to decrease the counter for every processing run to match the incrases
metrics.UploadProcessing.Sub(1)
v, _ := n.XattrString(ctx, prefixes.StatusPrefix)
if v != ProcessingStatus+uploadID {
// file started another postprocessing later - do not remove

View File

@@ -34,6 +34,7 @@ import (
"github.com/cs3org/reva/v2/pkg/appctx"
ctxpkg "github.com/cs3org/reva/v2/pkg/ctx"
"github.com/cs3org/reva/v2/pkg/errtypes"
"github.com/cs3org/reva/v2/pkg/rhttp/datatx/metrics"
"github.com/cs3org/reva/v2/pkg/storage"
"github.com/cs3org/reva/v2/pkg/storage/utils/chunking"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node"
@@ -215,6 +216,8 @@ func (fs *Decomposedfs) InitiateUpload(ctx context.Context, ref *provider.Refere
info, _ = upload.GetInfo(ctx)
metrics.UploadSessionsInitiated.Inc()
return map[string]string{
"simple": info.ID,
"tus": info.ID,

View File

@@ -39,6 +39,7 @@ import (
ctxpkg "github.com/cs3org/reva/v2/pkg/ctx"
"github.com/cs3org/reva/v2/pkg/errtypes"
"github.com/cs3org/reva/v2/pkg/events"
"github.com/cs3org/reva/v2/pkg/rhttp/datatx/metrics"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/lookup"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata/prefixes"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node"
@@ -265,6 +266,11 @@ func (upload *Upload) FinishUpload(_ context.Context) error {
return err
}
// increase the processing counter for every started processing
// will be decreased in Cleanup()
metrics.UploadProcessing.Inc()
metrics.UploadSessionsBytesReceived.Inc()
upload.Node = n
if upload.pub != nil {
@@ -295,6 +301,7 @@ func (upload *Upload) FinishUpload(_ context.Context) error {
log.Error().Err(err).Msg("failed to upload")
return err
}
metrics.UploadSessionsFinalized.Inc()
}
return upload.tp.Propagate(upload.Ctx, n, upload.SizeDiff)

View File

@@ -1,3 +1,4 @@
// Package natsjs provides a NATS Jetstream implementation of the events.Stream interface.
package natsjs
import (
@@ -33,11 +34,14 @@ func NewStream(opts ...Option) (events.Stream, error) {
}
s := &stream{opts: options}
natsJetStreamCtx, err := connectToNatsJetStream(options)
if err != nil {
return nil, fmt.Errorf("error connecting to nats cluster %v: %v", options.ClusterID, err)
return nil, fmt.Errorf("error connecting to nats cluster %v: %w", options.ClusterID, err)
}
s.natsJetStreamCtx = natsJetStreamCtx
return s, nil
}
@@ -52,6 +56,7 @@ func connectToNatsJetStream(options Options) (nats.JetStreamContext, error) {
nopts.Secure = true
nopts.TLSConfig = options.TLSConfig
}
if options.NkeyConfig != "" {
nopts.Nkey = options.NkeyConfig
}
@@ -63,14 +68,21 @@ func connectToNatsJetStream(options Options) (nats.JetStreamContext, error) {
if options.Name != "" {
nopts.Name = options.Name
}
if options.Username != "" && options.Password != "" {
nopts.User = options.Username
nopts.Password = options.Password
}
conn, err := nopts.Connect()
if err != nil {
return nil, fmt.Errorf("error connecting to nats at %v with tls enabled (%v): %v", options.Address, nopts.TLSConfig != nil, err)
tls := nopts.TLSConfig != nil
return nil, fmt.Errorf("error connecting to nats at %v with tls enabled (%v): %w", options.Address, tls, err)
}
js, err := conn.JetStream()
if err != nil {
return nil, fmt.Errorf("error while obtaining JetStream context: %v", err)
return nil, fmt.Errorf("error while obtaining JetStream context: %w", err)
}
return js, nil
@@ -125,6 +137,7 @@ func (s *stream) Publish(topic string, msg interface{}, opts ...events.PublishOp
if err != nil {
err = errors.Wrap(err, "Error publishing message to topic")
}
return err
}
@@ -154,14 +167,14 @@ func (s *stream) Consume(topic string, opts ...events.ConsumeOption) (<-chan eve
}
// setup the subscriber
c := make(chan events.Event)
handleMsg := func(m *nats.Msg) {
channel := make(chan events.Event)
handleMsg := func(msg *nats.Msg) {
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
// decode the message
var evt events.Event
if err := json.Unmarshal(m.Data, &evt); err != nil {
if err := json.Unmarshal(msg.Data, &evt); err != nil {
log.Logf(logger.ErrorLevel, "Error decoding message: %v", err)
// not acknowledging the message is the way to indicate an error occurred
return
@@ -170,23 +183,23 @@ func (s *stream) Consume(topic string, opts ...events.ConsumeOption) (<-chan eve
if !options.AutoAck {
// set up the ack funcs
evt.SetAckFunc(func() error {
return m.Ack()
return msg.Ack()
})
evt.SetNackFunc(func() error {
return m.Nak()
return msg.Nak()
})
}
// push onto the channel and wait for the consumer to take the event off before we acknowledge it.
c <- evt
channel <- evt
if !options.AutoAck {
return
}
if err := m.Ack(nats.Context(ctx)); err != nil {
if err := msg.Ack(nats.Context(ctx)); err != nil {
log.Logf(logger.ErrorLevel, "Error acknowledging message: %v", err)
}
}
// ensure that a stream exists for that topic
@@ -203,9 +216,7 @@ func (s *stream) Consume(topic string, opts ...events.ConsumeOption) (<-chan eve
}
// setup the options
subOpts := []nats.SubOpt{
nats.Durable(options.Group),
}
subOpts := []nats.SubOpt{}
if options.CustomRetries {
subOpts = append(subOpts, nats.MaxDeliver(options.GetRetryLimit()))
@@ -227,11 +238,18 @@ func (s *stream) Consume(topic string, opts ...events.ConsumeOption) (<-chan eve
subOpts = append(subOpts, nats.AckWait(options.AckWait))
}
// connect the subscriber
_, err = s.natsJetStreamCtx.QueueSubscribe(topic, options.Group, handleMsg, subOpts...)
// connect the subscriber via a queue group only if durable streams are enabled
if !s.opts.DisableDurableStreams {
subOpts = append(subOpts, nats.Durable(options.Group))
_, err = s.natsJetStreamCtx.QueueSubscribe(topic, options.Group, handleMsg, subOpts...)
} else {
subOpts = append(subOpts, nats.ConsumerName(options.Group))
_, err = s.natsJetStreamCtx.Subscribe(topic, handleMsg, subOpts...)
}
if err != nil {
return nil, errors.Wrap(err, "Error subscribing to topic")
}
return c, nil
return channel, nil
}

View File

@@ -8,14 +8,17 @@ import (
// Options which are used to configure the nats stream.
type Options struct {
ClusterID string
ClientID string
Address string
NkeyConfig string
TLSConfig *tls.Config
Logger logger.Logger
SyncPublish bool
Name string
ClusterID string
ClientID string
Address string
NkeyConfig string
TLSConfig *tls.Config
Logger logger.Logger
SyncPublish bool
Name string
DisableDurableStreams bool
Username string
Password string
}
// Option is a function which configures options.
@@ -49,30 +52,45 @@ func TLSConfig(t *tls.Config) Option {
}
}
// Nkey string to use when connecting to the cluster.
// NkeyConfig string to use when connecting to the cluster.
func NkeyConfig(nkey string) Option {
return func(o *Options) {
o.NkeyConfig = nkey
}
}
// Logger sets the underlyin logger
// Logger sets the underlying logger.
func Logger(log logger.Logger) Option {
return func(o *Options) {
o.Logger = log
}
}
// SynchronousPublish allows using a synchronous publishing instead of the default asynchronous
// SynchronousPublish allows using a synchronous publishing instead of the default asynchronous.
func SynchronousPublish(sync bool) Option {
return func(o *Options) {
o.SyncPublish = sync
}
}
// Name allows to add a name to the natsjs connection
// Name allows to add a name to the natsjs connection.
func Name(name string) Option {
return func(o *Options) {
o.Name = name
}
}
// DisableDurableStreams will disable durable streams.
func DisableDurableStreams() Option {
return func(o *Options) {
o.DisableDurableStreams = true
}
}
// Authenticate authenticates the connection with the given username and password.
func Authenticate(username, password string) Option {
return func(o *Options) {
o.Username = username
o.Password = password
}
}