From 400b9a5d30b0ad60f492aff66ce9f9be0aec8bec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Duffeck?= Date: Mon, 7 Jul 2025 15:36:50 +0200 Subject: [PATCH 1/8] Expose nats metrics of the search service --- services/search/.mockery.yaml | 2 +- services/search/pkg/metrics/metrics.go | 32 +++++++++++++++++-- services/search/pkg/search/events.go | 30 ++++++++++++++++- services/search/pkg/search/events_test.go | 2 +- services/search/pkg/server/grpc/server.go | 1 + services/search/pkg/service/grpc/v0/option.go | 11 +++++++ .../search/pkg/service/grpc/v0/service.go | 2 +- 7 files changed, 73 insertions(+), 7 deletions(-) diff --git a/services/search/.mockery.yaml b/services/search/.mockery.yaml index 12e0d3e06..1c2905419 100644 --- a/services/search/.mockery.yaml +++ b/services/search/.mockery.yaml @@ -15,4 +15,4 @@ packages: Retriever: {} github.com/opencloud-eu/opencloud/services/search/pkg/search: interfaces: - Searcher: {} + Searcher: {} \ No newline at end of file diff --git a/services/search/pkg/metrics/metrics.go b/services/search/pkg/metrics/metrics.go index c8459d782..437dc544c 100644 --- a/services/search/pkg/metrics/metrics.go +++ b/services/search/pkg/metrics/metrics.go @@ -1,6 +1,8 @@ package metrics -import "github.com/prometheus/client_golang/prometheus" +import ( + "github.com/prometheus/client_golang/prometheus" +) var ( // Namespace defines the namespace for the defines metrics. @@ -13,7 +15,10 @@ var ( // Metrics defines the available metrics of this service. type Metrics struct { // Counter *prometheus.CounterVec - BuildInfo *prometheus.GaugeVec + BuildInfo *prometheus.GaugeVec + EventsOutstandingAcks prometheus.Gauge + EventsUnprocessed prometheus.Gauge + EventsRedelivered prometheus.Gauge } // New initializes the available metrics. @@ -25,9 +30,30 @@ func New() *Metrics { Name: "build_info", Help: "Build information", }, []string{"version"}), + EventsOutstandingAcks: prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: Namespace, + Subsystem: Subsystem, + Name: "events_outstanding_acks", + Help: "Number of outstanding acks for events", + }), + EventsUnprocessed: prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: Namespace, + Subsystem: Subsystem, + Name: "events_unprocessed", + Help: "Number of unprocessed events", + }), + EventsRedelivered: prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: Namespace, + Subsystem: Subsystem, + Name: "events_redelivered", + Help: "Number of redelivered events", + }), } _ = prometheus.Register(m.BuildInfo) - // TODO: implement metrics + _ = prometheus.Register(m.EventsOutstandingAcks) + _ = prometheus.Register(m.EventsUnprocessed) + _ = prometheus.Register(m.EventsRedelivered) + return m } diff --git a/services/search/pkg/search/events.go b/services/search/pkg/search/events.go index 32c4c8c53..eca39f94f 100644 --- a/services/search/pkg/search/events.go +++ b/services/search/pkg/search/events.go @@ -1,11 +1,13 @@ package search import ( + "context" "time" provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" "github.com/opencloud-eu/opencloud/pkg/log" "github.com/opencloud-eu/opencloud/services/search/pkg/config" + "github.com/opencloud-eu/opencloud/services/search/pkg/metrics" "github.com/opencloud-eu/reva/v2/pkg/events" "github.com/opencloud-eu/reva/v2/pkg/events/raw" "github.com/opencloud-eu/reva/v2/pkg/storagespace" @@ -13,7 +15,7 @@ import ( // HandleEvents listens to the needed events, // it handles the whole resource indexing livecycle. -func HandleEvents(s Searcher, stream raw.Stream, cfg *config.Config, logger log.Logger) error { +func HandleEvents(s Searcher, stream raw.Stream, cfg *config.Config, m *metrics.Metrics, logger log.Logger) error { evts := []events.Unmarshaller{ events.ItemTrashed{}, events.ItemRestored{}, @@ -37,6 +39,10 @@ func HandleEvents(s Searcher, stream raw.Stream, cfg *config.Config, logger log. return err } + if m != nil { + monitorMetrics(stream, "search-pull", m, logger) + } + if cfg.Events.NumConsumers == 0 { cfg.Events.NumConsumers = 1 } @@ -103,3 +109,25 @@ func HandleEvents(s Searcher, stream raw.Stream, cfg *config.Config, logger log. return nil } + +func monitorMetrics(stream raw.Stream, name string, m *metrics.Metrics, logger log.Logger) { + ctx := context.Background() + consumer, err := stream.JetStream().Consumer(ctx, name) + if err != nil { + logger.Error().Err(err).Msg("failed to get consumer") + } + ticker := time.NewTicker(5 * time.Second) + go func() { + for range ticker.C { + info, err := consumer.Info(ctx) + if err != nil { + logger.Error().Err(err).Msg("failed to get consumer") + } + + m.EventsOutstandingAcks.Set(float64(info.NumAckPending)) + m.EventsUnprocessed.Set(float64(info.NumPending)) + m.EventsRedelivered.Set(float64(info.NumRedelivered)) + logger.Debug().Msg("updated event metrics") + } + }() +} diff --git a/services/search/pkg/search/events_test.go b/services/search/pkg/search/events_test.go index 15d37e541..664078581 100644 --- a/services/search/pkg/search/events_test.go +++ b/services/search/pkg/search/events_test.go @@ -31,7 +31,7 @@ var _ = DescribeTable("events", Events: config.Events{ AsyncUploads: asyncUploads, }, - }, log.NewLogger()) + }, nil, log.NewLogger()) for _, mck := range mcks { s.On(mck, mock.Anything, mock.Anything).Return(nil).Run(func(args mock.Arguments) { diff --git a/services/search/pkg/server/grpc/server.go b/services/search/pkg/server/grpc/server.go index 61e59a4f5..d3a012ec8 100644 --- a/services/search/pkg/server/grpc/server.go +++ b/services/search/pkg/server/grpc/server.go @@ -36,6 +36,7 @@ func Server(opts ...Option) (grpc.Service, func(), error) { svc.Logger(options.Logger), svc.JWTSecret(options.JWTSecret), svc.TracerProvider(options.TraceProvider), + svc.Metrics(options.Metrics), ) if err != nil { options.Logger.Error(). diff --git a/services/search/pkg/service/grpc/v0/option.go b/services/search/pkg/service/grpc/v0/option.go index 2bb8d0bf3..b7ab82250 100644 --- a/services/search/pkg/service/grpc/v0/option.go +++ b/services/search/pkg/service/grpc/v0/option.go @@ -3,6 +3,7 @@ package service import ( "github.com/opencloud-eu/opencloud/pkg/log" "github.com/opencloud-eu/opencloud/services/search/pkg/config" + "github.com/opencloud-eu/opencloud/services/search/pkg/metrics" "go.opentelemetry.io/otel/trace" ) @@ -15,6 +16,7 @@ type Options struct { Config *config.Config JWTSecret string TracerProvider trace.TracerProvider + Metrics *metrics.Metrics } func newOptions(opts ...Option) Options { @@ -54,3 +56,12 @@ func TracerProvider(val trace.TracerProvider) Option { o.TracerProvider = val } } + +// Metrics provides a function to set the Metrics option. +func Metrics(val *metrics.Metrics) Option { + return func(o *Options) { + if val != nil { + o.Metrics = val + } + } +} diff --git a/services/search/pkg/service/grpc/v0/service.go b/services/search/pkg/service/grpc/v0/service.go index 59dc238e9..bb410e564 100644 --- a/services/search/pkg/service/grpc/v0/service.go +++ b/services/search/pkg/service/grpc/v0/service.go @@ -98,7 +98,7 @@ func NewHandler(opts ...Option) (searchsvc.SearchProviderHandler, func(), error) return nil, teardown, err } - if err := search.HandleEvents(ss, stream, cfg, logger); err != nil { + if err := search.HandleEvents(ss, stream, cfg, options.Metrics, logger); err != nil { return nil, teardown, err } From 273c0ed270f3684194c34c20d435967078e4842f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Duffeck?= Date: Tue, 8 Jul 2025 12:01:00 +0200 Subject: [PATCH 2/8] Expose nats metris for the postprocessing service --- .../postprocessing/pkg/metrics/metrics.go | 69 +++++++++++++++++++ .../postprocessing/pkg/service/service.go | 66 +++++++++++++----- services/search/pkg/search/events.go | 2 +- 3 files changed, 120 insertions(+), 17 deletions(-) create mode 100644 services/postprocessing/pkg/metrics/metrics.go diff --git a/services/postprocessing/pkg/metrics/metrics.go b/services/postprocessing/pkg/metrics/metrics.go new file mode 100644 index 000000000..077a04018 --- /dev/null +++ b/services/postprocessing/pkg/metrics/metrics.go @@ -0,0 +1,69 @@ +package metrics + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +var ( + // Namespace defines the namespace for the defines metrics. + Namespace = "opencloud" + + // Subsystem defines the subsystem for the defines metrics. + Subsystem = "postprocessing" +) + +// Metrics defines the available metrics of this service. +type Metrics struct { + // Counter *prometheus.CounterVec + BuildInfo *prometheus.GaugeVec + EventsOutstandingAcks prometheus.Gauge + EventsUnprocessed prometheus.Gauge + EventsRedelivered prometheus.Gauge + InProgress prometheus.Gauge + Finished *prometheus.CounterVec +} + +// New initializes the available metrics. +func New() *Metrics { + m := &Metrics{ + BuildInfo: promauto.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: Namespace, + Subsystem: Subsystem, + Name: "build_info", + Help: "Build information", + }, []string{"version"}), + EventsOutstandingAcks: promauto.NewGauge(prometheus.GaugeOpts{ + Namespace: Namespace, + Subsystem: Subsystem, + Name: "events_outstanding_acks", + Help: "Number of outstanding acks for events", + }), + EventsUnprocessed: promauto.NewGauge(prometheus.GaugeOpts{ + Namespace: Namespace, + Subsystem: Subsystem, + Name: "events_unprocessed", + Help: "Number of unprocessed events", + }), + EventsRedelivered: promauto.NewGauge(prometheus.GaugeOpts{ + Namespace: Namespace, + Subsystem: Subsystem, + Name: "events_redelivered", + Help: "Number of redelivered events", + }), + InProgress: promauto.NewGauge(prometheus.GaugeOpts{ + Namespace: Namespace, + Subsystem: Subsystem, + Name: "in_progress", + Help: "Number of postprocessing events in progress", + }), + Finished: promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: Namespace, + Subsystem: Subsystem, + Name: "finished", + Help: "Number of finished postprocessing events", + }, []string{"status"}), + } + + return m +} diff --git a/services/postprocessing/pkg/service/service.go b/services/postprocessing/pkg/service/service.go index 2b41b2888..c9ca40d3f 100644 --- a/services/postprocessing/pkg/service/service.go +++ b/services/postprocessing/pkg/service/service.go @@ -9,7 +9,9 @@ import ( "time" "github.com/opencloud-eu/opencloud/pkg/log" + "github.com/opencloud-eu/opencloud/pkg/version" "github.com/opencloud-eu/opencloud/services/postprocessing/pkg/config" + "github.com/opencloud-eu/opencloud/services/postprocessing/pkg/metrics" "github.com/opencloud-eu/opencloud/services/postprocessing/pkg/postprocessing" ctxpkg "github.com/opencloud-eu/reva/v2/pkg/ctx" "github.com/opencloud-eu/reva/v2/pkg/events" @@ -22,14 +24,15 @@ import ( // PostprocessingService is an instance of the service handling postprocessing of files type PostprocessingService struct { - ctx context.Context - log log.Logger - events <-chan raw.Event - pub events.Publisher - steps []events.Postprocessingstep - store store.Store - c config.Postprocessing - tp trace.TracerProvider + ctx context.Context + log log.Logger + events <-chan raw.Event + pub events.Publisher + steps []events.Postprocessingstep + store store.Store + c config.Postprocessing + tp trace.TracerProvider + metrics *metrics.Metrics } var ( @@ -78,15 +81,20 @@ func NewPostprocessingService(ctx context.Context, logger log.Logger, sto store. return nil, err } + m := metrics.New() + m.BuildInfo.WithLabelValues(version.GetString()).Set(1) + monitorMetrics(raw, "postprocessing-pull", m, logger) + return &PostprocessingService{ - ctx: ctx, - log: logger, - events: evs, - pub: pub, - steps: getSteps(cfg.Postprocessing), - store: sto, - c: cfg.Postprocessing, - tp: tp, + ctx: ctx, + log: logger, + events: evs, + pub: pub, + steps: getSteps(cfg.Postprocessing), + store: sto, + c: cfg.Postprocessing, + tp: tp, + metrics: m, }, nil } @@ -150,6 +158,7 @@ func (pps *PostprocessingService) processEvent(e raw.Event) error { InitiatorID: e.InitiatorID, ImpersonatingUser: ev.ImpersonatingUser, } + pps.metrics.InProgress.Inc() next = pp.Init(ev) case events.PostprocessingStepFinished: if ev.UploadID == "" { @@ -200,7 +209,9 @@ func (pps *PostprocessingService) processEvent(e raw.Event) error { } }) case events.UploadReady: + pps.metrics.InProgress.Dec() if ev.Failed { + pps.metrics.Finished.WithLabelValues("failed", string(pp.Status.Outcome)).Inc() // the upload failed - let's keep it around for a while - but mark it as finished pp, err = pps.getPP(pps.store, ev.UploadID) if err != nil { @@ -211,6 +222,7 @@ func (pps *PostprocessingService) processEvent(e raw.Event) error { return storePP(pps.store, pp) } + pps.metrics.Finished.WithLabelValues("succeeded").Inc() // the storage provider thinks the upload is done - so no need to keep it any more if err := pps.store.Delete(ev.UploadID); err != nil { pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot delete upload") @@ -360,3 +372,25 @@ func (pps *PostprocessingService) findUploadsByStep(step events.Postprocessingst return ids } + +func monitorMetrics(stream raw.Stream, name string, m *metrics.Metrics, logger log.Logger) { + ctx := context.Background() + consumer, err := stream.JetStream().Consumer(ctx, name) + if err != nil { + logger.Error().Err(err).Msg("failed to get consumer") + } + ticker := time.NewTicker(5 * time.Second) + go func() { + for range ticker.C { + info, err := consumer.Info(ctx) + if err != nil { + logger.Error().Err(err).Msg("failed to get consumer") + } + + m.EventsOutstandingAcks.Set(float64(info.NumAckPending)) + m.EventsUnprocessed.Set(float64(info.NumPending)) + m.EventsRedelivered.Set(float64(info.NumRedelivered)) + logger.Trace().Msg("updated postprocessing event metrics") + } + }() +} diff --git a/services/search/pkg/search/events.go b/services/search/pkg/search/events.go index eca39f94f..a02286d9c 100644 --- a/services/search/pkg/search/events.go +++ b/services/search/pkg/search/events.go @@ -127,7 +127,7 @@ func monitorMetrics(stream raw.Stream, name string, m *metrics.Metrics, logger l m.EventsOutstandingAcks.Set(float64(info.NumAckPending)) m.EventsUnprocessed.Set(float64(info.NumPending)) m.EventsRedelivered.Set(float64(info.NumRedelivered)) - logger.Debug().Msg("updated event metrics") + logger.Trace().Msg("updated search event metrics") } }() } From 791b4df173f87fbb1c2a76d9449d68007d2e5ad3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Duffeck?= Date: Tue, 8 Jul 2025 15:03:19 +0200 Subject: [PATCH 3/8] Collect metrics about search and index durations --- services/search/pkg/metrics/metrics.go | 30 ++++++++++++------ services/search/pkg/search/service.go | 31 +++++++++++++++++-- .../search/pkg/service/grpc/v0/service.go | 2 +- 3 files changed, 51 insertions(+), 12 deletions(-) diff --git a/services/search/pkg/metrics/metrics.go b/services/search/pkg/metrics/metrics.go index 437dc544c..7cd600fe5 100644 --- a/services/search/pkg/metrics/metrics.go +++ b/services/search/pkg/metrics/metrics.go @@ -2,6 +2,7 @@ package metrics import ( "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" ) var ( @@ -19,41 +20,52 @@ type Metrics struct { EventsOutstandingAcks prometheus.Gauge EventsUnprocessed prometheus.Gauge EventsRedelivered prometheus.Gauge + SearchDuration *prometheus.HistogramVec + IndexDuration *prometheus.HistogramVec } // New initializes the available metrics. func New() *Metrics { m := &Metrics{ - BuildInfo: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + BuildInfo: promauto.NewGaugeVec(prometheus.GaugeOpts{ Namespace: Namespace, Subsystem: Subsystem, Name: "build_info", Help: "Build information", }, []string{"version"}), - EventsOutstandingAcks: prometheus.NewGauge(prometheus.GaugeOpts{ + EventsOutstandingAcks: promauto.NewGauge(prometheus.GaugeOpts{ Namespace: Namespace, Subsystem: Subsystem, Name: "events_outstanding_acks", Help: "Number of outstanding acks for events", }), - EventsUnprocessed: prometheus.NewGauge(prometheus.GaugeOpts{ + EventsUnprocessed: promauto.NewGauge(prometheus.GaugeOpts{ Namespace: Namespace, Subsystem: Subsystem, Name: "events_unprocessed", Help: "Number of unprocessed events", }), - EventsRedelivered: prometheus.NewGauge(prometheus.GaugeOpts{ + EventsRedelivered: promauto.NewGauge(prometheus.GaugeOpts{ Namespace: Namespace, Subsystem: Subsystem, Name: "events_redelivered", Help: "Number of redelivered events", }), + SearchDuration: promauto.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: Namespace, + Subsystem: Subsystem, + Name: "search_duration_seconds", + Help: "Duration of search operations in seconds", + Buckets: []float64{0.1, 0.5, 1, 2.5, 5, 10, 30, 60}, + }, []string{"status"}), + IndexDuration: promauto.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: Namespace, + Subsystem: Subsystem, + Name: "index_duration_seconds", + Help: "Duration of indexing operations in seconds", + Buckets: []float64{0.1, 0.5, 1, 2.5, 5, 10, 30, 60, 120, 300, 600, 1200}, + }, []string{"status"}), } - _ = prometheus.Register(m.BuildInfo) - _ = prometheus.Register(m.EventsOutstandingAcks) - _ = prometheus.Register(m.EventsUnprocessed) - _ = prometheus.Register(m.EventsRedelivered) - return m } diff --git a/services/search/pkg/search/service.go b/services/search/pkg/search/service.go index 95591db78..0ef9eb37f 100644 --- a/services/search/pkg/search/service.go +++ b/services/search/pkg/search/service.go @@ -14,6 +14,7 @@ import ( rpcv1beta1 "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1" collaborationv1beta1 "github.com/cs3org/go-cs3apis/cs3/sharing/collaboration/v1beta1" provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" + libregraph "github.com/opencloud-eu/libre-graph-api-go" revactx "github.com/opencloud-eu/reva/v2/pkg/ctx" "github.com/opencloud-eu/reva/v2/pkg/errtypes" "github.com/opencloud-eu/reva/v2/pkg/rgrpc/todo/pool" @@ -21,7 +22,6 @@ import ( "github.com/opencloud-eu/reva/v2/pkg/storage/utils/walker" "github.com/opencloud-eu/reva/v2/pkg/storagespace" "github.com/opencloud-eu/reva/v2/pkg/utils" - libregraph "github.com/opencloud-eu/libre-graph-api-go" "golang.org/x/sync/errgroup" "google.golang.org/protobuf/types/known/fieldmaskpb" @@ -31,6 +31,7 @@ import ( "github.com/opencloud-eu/opencloud/services/search/pkg/config" "github.com/opencloud-eu/opencloud/services/search/pkg/content" "github.com/opencloud-eu/opencloud/services/search/pkg/engine" + "github.com/opencloud-eu/opencloud/services/search/pkg/metrics" ) const ( @@ -59,6 +60,7 @@ type Service struct { gatewaySelector pool.Selectable[gateway.GatewayAPIClient] engine engine.Engine extractor content.Extractor + metrics *metrics.Metrics serviceAccountID string serviceAccountSecret string @@ -67,12 +69,13 @@ type Service struct { var errSkipSpace error // NewService creates a new Provider instance. -func NewService(gatewaySelector pool.Selectable[gateway.GatewayAPIClient], eng engine.Engine, extractor content.Extractor, logger log.Logger, cfg *config.Config) *Service { +func NewService(gatewaySelector pool.Selectable[gateway.GatewayAPIClient], eng engine.Engine, extractor content.Extractor, metrics *metrics.Metrics, logger log.Logger, cfg *config.Config) *Service { var s = &Service{ gatewaySelector: gatewaySelector, engine: eng, logger: logger, extractor: extractor, + metrics: metrics, serviceAccountID: cfg.ServiceAccount.ServiceAccountID, serviceAccountSecret: cfg.ServiceAccount.ServiceAccountSecret, @@ -85,6 +88,17 @@ func NewService(gatewaySelector pool.Selectable[gateway.GatewayAPIClient], eng e func (s *Service) Search(ctx context.Context, req *searchsvc.SearchRequest) (*searchsvc.SearchResponse, error) { s.logger.Debug().Str("query", req.Query).Msg("performing a search") + // collect metrics + startTime := time.Now() + success := false + defer func() { + status := "success" + if !success { + status = "error" + } + s.metrics.SearchDuration.WithLabelValues(status).Observe(time.Since(startTime).Seconds()) + }() + gatewayClient, err := s.gatewaySelector.Next() if err != nil { return nil, err @@ -255,6 +269,7 @@ func (s *Service) Search(ctx context.Context, req *searchsvc.SearchRequest) (*se matches = matches[0:limit] } + success = true return &searchsvc.SearchResponse{ Matches: matches, TotalMatches: total, @@ -425,6 +440,17 @@ func (s *Service) IndexSpace(spaceID *provider.StorageSpaceId) error { } rootID.OpaqueId = rootID.SpaceId + // Collect metrics + startTime := time.Now() + success := false + defer func() { + status := "success" + if !success { + status = "error" + } + s.metrics.IndexDuration.WithLabelValues(status).Observe(time.Since(startTime).Seconds()) + }() + w := walker.NewWalker(s.gatewaySelector) err = w.Walk(ownerCtx, &rootID, func(wd string, info *provider.ResourceInfo, err error) error { if err != nil { @@ -465,6 +491,7 @@ func (s *Service) IndexSpace(spaceID *provider.StorageSpaceId) error { } logDocCount(s.engine, s.logger) + success = true return nil } diff --git a/services/search/pkg/service/grpc/v0/service.go b/services/search/pkg/service/grpc/v0/service.go index bb410e564..a89efd4da 100644 --- a/services/search/pkg/service/grpc/v0/service.go +++ b/services/search/pkg/service/grpc/v0/service.go @@ -79,7 +79,7 @@ func NewHandler(opts ...Option) (searchsvc.SearchProviderHandler, func(), error) return nil, teardown, fmt.Errorf("unknown search extractor: %s", cfg.Extractor.Type) } - ss := search.NewService(selector, eng, extractor, logger, cfg) + ss := search.NewService(selector, eng, extractor, options.Metrics, logger, cfg) // setup event handling From 4704cedd3bce8676227858aff3ea0d1692fecec8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Duffeck?= Date: Thu, 10 Jul 2025 13:55:38 +0200 Subject: [PATCH 4/8] Add metrics for the postprocessing duration --- services/postprocessing/pkg/metrics/metrics.go | 8 ++++++++ .../pkg/postprocessing/postprocessing.go | 1 + services/postprocessing/pkg/service/service.go | 18 +++++++++++++----- 3 files changed, 22 insertions(+), 5 deletions(-) diff --git a/services/postprocessing/pkg/metrics/metrics.go b/services/postprocessing/pkg/metrics/metrics.go index 077a04018..4391a27d3 100644 --- a/services/postprocessing/pkg/metrics/metrics.go +++ b/services/postprocessing/pkg/metrics/metrics.go @@ -22,6 +22,7 @@ type Metrics struct { EventsRedelivered prometheus.Gauge InProgress prometheus.Gauge Finished *prometheus.CounterVec + Duration *prometheus.HistogramVec } // New initializes the available metrics. @@ -63,6 +64,13 @@ func New() *Metrics { Name: "finished", Help: "Number of finished postprocessing events", }, []string{"status"}), + Duration: promauto.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: Namespace, + Subsystem: Subsystem, + Name: "duration_seconds", + Help: "Duration of postprocessing operations in seconds", + Buckets: []float64{0.1, 0.5, 1, 2.5, 5, 10, 30, 60, 120, 300, 600, 1200}, + }, []string{"status"}), } return m diff --git a/services/postprocessing/pkg/postprocessing/postprocessing.go b/services/postprocessing/pkg/postprocessing/postprocessing.go index eac1a5b51..bc6db76cb 100644 --- a/services/postprocessing/pkg/postprocessing/postprocessing.go +++ b/services/postprocessing/pkg/postprocessing/postprocessing.go @@ -24,6 +24,7 @@ type Postprocessing struct { Failures int InitiatorID string Finished bool + StartTime time.Time config config.Postprocessing } diff --git a/services/postprocessing/pkg/service/service.go b/services/postprocessing/pkg/service/service.go index c9ca40d3f..768bece9a 100644 --- a/services/postprocessing/pkg/service/service.go +++ b/services/postprocessing/pkg/service/service.go @@ -157,6 +157,7 @@ func (pps *PostprocessingService) processEvent(e raw.Event) error { Steps: pps.steps, InitiatorID: e.InitiatorID, ImpersonatingUser: ev.ImpersonatingUser, + StartTime: time.Now(), } pps.metrics.InProgress.Inc() next = pp.Init(ev) @@ -210,19 +211,26 @@ func (pps *PostprocessingService) processEvent(e raw.Event) error { }) case events.UploadReady: pps.metrics.InProgress.Dec() + // the upload failed - let's keep it around for a while - but mark it as finished + pp, err = pps.getPP(pps.store, ev.UploadID) + if err != nil { + pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot get upload") + return fmt.Errorf("%w: cannot get upload", ErrEvent) + } + if ev.Failed { pps.metrics.Finished.WithLabelValues("failed", string(pp.Status.Outcome)).Inc() - // the upload failed - let's keep it around for a while - but mark it as finished - pp, err = pps.getPP(pps.store, ev.UploadID) - if err != nil { - pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot get upload") - return fmt.Errorf("%w: cannot get upload", ErrEvent) + if !pp.StartTime.IsZero() { + pps.metrics.Duration.WithLabelValues("failed").Observe(time.Since(pp.StartTime).Seconds()) } pp.Finished = true return storePP(pps.store, pp) } pps.metrics.Finished.WithLabelValues("succeeded").Inc() + if !pp.StartTime.IsZero() { + pps.metrics.Duration.WithLabelValues("succeeded").Observe(time.Since(pp.StartTime).Seconds()) + } // the storage provider thinks the upload is done - so no need to keep it any more if err := pps.store.Delete(ev.UploadID); err != nil { pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot delete upload") From 8adf425b2d4d202431f7f2bc28519e7862a2d503 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Duffeck?= Date: Thu, 17 Jul 2025 15:12:09 +0200 Subject: [PATCH 5/8] Bump reva --- go.mod | 6 +- go.sum | 12 ++-- .../golang-jwt/jwt/v5/MIGRATION_GUIDE.md | 2 +- vendor/github.com/golang-jwt/jwt/v5/ecdsa.go | 4 +- .../golang-jwt/jwt/v5/ecdsa_utils.go | 4 +- .../github.com/golang-jwt/jwt/v5/ed25519.go | 4 +- .../golang-jwt/jwt/v5/ed25519_utils.go | 4 +- vendor/github.com/golang-jwt/jwt/v5/hmac.go | 4 +- .../golang-jwt/jwt/v5/map_claims.go | 8 +-- vendor/github.com/golang-jwt/jwt/v5/none.go | 4 +- .../golang-jwt/jwt/v5/parser_option.go | 25 +++++-- vendor/github.com/golang-jwt/jwt/v5/rsa.go | 4 +- .../github.com/golang-jwt/jwt/v5/rsa_pss.go | 4 +- .../github.com/golang-jwt/jwt/v5/rsa_utils.go | 6 +- .../golang-jwt/jwt/v5/signing_method.go | 6 +- vendor/github.com/golang-jwt/jwt/v5/token.go | 20 +++--- vendor/github.com/golang-jwt/jwt/v5/types.go | 4 +- .../github.com/golang-jwt/jwt/v5/validator.go | 50 ++++++++------ .../v2/internal/http/interceptors/log/log.go | 5 ++ .../reva/v2/pkg/events/raw/mocks/Stream.go | 49 +++++++++++++ .../reva/v2/pkg/events/raw/raw.go | 11 ++- .../grpc/balancer/balancer.go | 8 ++- .../endpointsharding/endpointsharding.go | 36 +++++++--- .../pickfirst/pickfirstleaf/pickfirstleaf.go | 27 +------- .../grpc/balancer/roundrobin/roundrobin.go | 7 -- vendor/google.golang.org/grpc/dialoptions.go | 21 ++++++ .../health/grpc_health_v1/health_grpc.pb.go | 6 +- .../balancer/gracefulswitch/gracefulswitch.go | 10 +-- .../grpc/internal/envconfig/envconfig.go | 4 -- .../grpc/internal/transport/controlbuf.go | 68 +++++++++++++------ .../grpc/internal/transport/http2_client.go | 20 ++---- .../grpc/internal/transport/http2_server.go | 44 +++++++----- .../grpc/internal/transport/http_util.go | 3 - .../grpc/internal/transport/transport.go | 3 + .../grpc/mem/buffer_slice.go | 11 +++ .../grpc_reflection_v1/reflection_grpc.pb.go | 2 +- .../reflection_grpc.pb.go | 2 +- vendor/google.golang.org/grpc/server.go | 26 +++++++ vendor/google.golang.org/grpc/stream.go | 4 +- vendor/google.golang.org/grpc/version.go | 2 +- vendor/modules.txt | 6 +- 41 files changed, 351 insertions(+), 195 deletions(-) diff --git a/go.mod b/go.mod index 1a76a9e10..bba05e55b 100644 --- a/go.mod +++ b/go.mod @@ -35,7 +35,7 @@ require ( github.com/go-micro/plugins/v4/wrapper/trace/opentelemetry v1.2.0 github.com/go-playground/validator/v10 v10.27.0 github.com/gofrs/uuid v4.4.0+incompatible - github.com/golang-jwt/jwt/v5 v5.2.2 + github.com/golang-jwt/jwt/v5 v5.2.3 github.com/golang/protobuf v1.5.4 github.com/google/go-cmp v0.7.0 github.com/google/go-tika v0.3.1 @@ -64,7 +64,7 @@ require ( github.com/onsi/gomega v1.37.0 github.com/open-policy-agent/opa v1.6.0 github.com/opencloud-eu/libre-graph-api-go v1.0.8-0.20250707143759-32eaae12b2ce - github.com/opencloud-eu/reva/v2 v2.34.1-0.20250716074813-cfe225225b23 + github.com/opencloud-eu/reva/v2 v2.34.1-0.20250717130558-81c1803fb574 github.com/orcaman/concurrent-map v1.0.0 github.com/pkg/errors v0.9.1 github.com/pkg/xattr v0.4.12 @@ -106,7 +106,7 @@ require ( golang.org/x/term v0.33.0 golang.org/x/text v0.27.0 google.golang.org/genproto/googleapis/api v0.0.0-20250603155806-513f23925822 - google.golang.org/grpc v1.73.0 + google.golang.org/grpc v1.74.0 google.golang.org/protobuf v1.36.6 gopkg.in/yaml.v2 v2.4.0 gotest.tools/v3 v3.5.2 diff --git a/go.sum b/go.sum index 16efc0760..e2cf04272 100644 --- a/go.sum +++ b/go.sum @@ -459,8 +459,8 @@ github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69 github.com/goji/httpauth v0.0.0-20160601135302-2da839ab0f4d/go.mod h1:nnjvkQ9ptGaCkuDUx6wNykzzlUixGxvkme+H/lnzb+A= github.com/golang-jwt/jwt/v4 v4.5.2 h1:YtQM7lnr8iZ+j5q71MGKkNw9Mn7AjHM68uc9g5fXeUI= github.com/golang-jwt/jwt/v4 v4.5.2/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0= -github.com/golang-jwt/jwt/v5 v5.2.2 h1:Rl4B7itRWVtYIHFrSNd7vhTiz9UpLdi6gZhZ3wEeDy8= -github.com/golang-jwt/jwt/v5 v5.2.2/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk= +github.com/golang-jwt/jwt/v5 v5.2.3 h1:kkGXqQOBSDDWRhWNXTFpqGSCMyh/PLnqUvMGJPDJDs0= +github.com/golang-jwt/jwt/v5 v5.2.3/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= @@ -868,8 +868,8 @@ github.com/opencloud-eu/go-micro-plugins/v4/store/nats-js-kv v0.0.0-202505121527 github.com/opencloud-eu/go-micro-plugins/v4/store/nats-js-kv v0.0.0-20250512152754-23325793059a/go.mod h1:pjcozWijkNPbEtX5SIQaxEW/h8VAVZYTLx+70bmB3LY= github.com/opencloud-eu/libre-graph-api-go v1.0.8-0.20250707143759-32eaae12b2ce h1:tjbIYsW5CFsEbCf5B/KN0Mo1oKU/K+oipgFm2B6wzG4= github.com/opencloud-eu/libre-graph-api-go v1.0.8-0.20250707143759-32eaae12b2ce/go.mod h1:pzatilMEHZFT3qV7C/X3MqOa3NlRQuYhlRhZTL+hN6Q= -github.com/opencloud-eu/reva/v2 v2.34.1-0.20250716074813-cfe225225b23 h1:FY6l12zi57efPXe9kVU1U6FB6HMuAV/t0XJPEU2XVDw= -github.com/opencloud-eu/reva/v2 v2.34.1-0.20250716074813-cfe225225b23/go.mod h1:5Zur6s3GoCbhdU09voU8EO+Ls71NiHgWYmhcvmngjwY= +github.com/opencloud-eu/reva/v2 v2.34.1-0.20250717130558-81c1803fb574 h1:sx7eqVOdc1i+fGk6f3yoAdtPE1vYzAtBchT9kVjAnTk= +github.com/opencloud-eu/reva/v2 v2.34.1-0.20250717130558-81c1803fb574/go.mod h1:UVPwuMjfgPekuh7unWavJSiPihgmk1GYF3xct0q3+X0= github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= github.com/opentracing/opentracing-go v1.2.0 h1:uEJPy/1a5RIPAJ0Ov+OIO8OxWu77jEv+1B0VhjKrZUs= github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYrxe9dPLANfrWvHYVTgc= @@ -1624,8 +1624,8 @@ google.golang.org/grpc v1.29.1/go.mod h1:itym6AZVZYACWQqET3MqgPpjcuV5QH3BxFS3Iji google.golang.org/grpc v1.30.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak= google.golang.org/grpc v1.31.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak= google.golang.org/grpc v1.33.2/go.mod h1:JMHMWHQWaTccqQQlmk3MJZS+GWXOdAesneDmEnv2fbc= -google.golang.org/grpc v1.73.0 h1:VIWSmpI2MegBtTuFt5/JWy2oXxtjJ/e89Z70ImfD2ok= -google.golang.org/grpc v1.73.0/go.mod h1:50sbHOUqWoCQGI8V2HQLJM0B+LMlIUjNSZmow7EVBQc= +google.golang.org/grpc v1.74.0 h1:sxRSkyLxlceWQiqDofxDot3d4u7DyoHPc7SBXMj8gGY= +google.golang.org/grpc v1.74.0/go.mod h1:NZUaK8dAMUfzhK6uxZ+9511LtOrk73UGWOFoNvz7z+s= google.golang.org/grpc/examples v0.0.0-20211102180624-670c133e568e h1:m7aQHHqd0q89mRwhwS9Bx2rjyl/hsFAeta+uGrHsQaU= google.golang.org/grpc/examples v0.0.0-20211102180624-670c133e568e/go.mod h1:gID3PKrg7pWKntu9Ss6zTLJ0ttC0X9IHgREOCZwbCVU= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= diff --git a/vendor/github.com/golang-jwt/jwt/v5/MIGRATION_GUIDE.md b/vendor/github.com/golang-jwt/jwt/v5/MIGRATION_GUIDE.md index ff9c57e1d..b3178e751 100644 --- a/vendor/github.com/golang-jwt/jwt/v5/MIGRATION_GUIDE.md +++ b/vendor/github.com/golang-jwt/jwt/v5/MIGRATION_GUIDE.md @@ -155,7 +155,7 @@ stored in base64 encoded form, which was redundant with the information in the type Token struct { Raw string // Raw contains the raw token Method SigningMethod // Method is the signing method used or to be used - Header map[string]interface{} // Header is the first segment of the token in decoded form + Header map[string]any // Header is the first segment of the token in decoded form Claims Claims // Claims is the second segment of the token in decoded form Signature []byte // Signature is the third segment of the token in decoded form Valid bool // Valid specifies if the token is valid diff --git a/vendor/github.com/golang-jwt/jwt/v5/ecdsa.go b/vendor/github.com/golang-jwt/jwt/v5/ecdsa.go index c929e4a02..06cd94d23 100644 --- a/vendor/github.com/golang-jwt/jwt/v5/ecdsa.go +++ b/vendor/github.com/golang-jwt/jwt/v5/ecdsa.go @@ -55,7 +55,7 @@ func (m *SigningMethodECDSA) Alg() string { // Verify implements token verification for the SigningMethod. // For this verify method, key must be an ecdsa.PublicKey struct -func (m *SigningMethodECDSA) Verify(signingString string, sig []byte, key interface{}) error { +func (m *SigningMethodECDSA) Verify(signingString string, sig []byte, key any) error { // Get the key var ecdsaKey *ecdsa.PublicKey switch k := key.(type) { @@ -89,7 +89,7 @@ func (m *SigningMethodECDSA) Verify(signingString string, sig []byte, key interf // Sign implements token signing for the SigningMethod. // For this signing method, key must be an ecdsa.PrivateKey struct -func (m *SigningMethodECDSA) Sign(signingString string, key interface{}) ([]byte, error) { +func (m *SigningMethodECDSA) Sign(signingString string, key any) ([]byte, error) { // Get the key var ecdsaKey *ecdsa.PrivateKey switch k := key.(type) { diff --git a/vendor/github.com/golang-jwt/jwt/v5/ecdsa_utils.go b/vendor/github.com/golang-jwt/jwt/v5/ecdsa_utils.go index 5700636d3..44a3b7a1c 100644 --- a/vendor/github.com/golang-jwt/jwt/v5/ecdsa_utils.go +++ b/vendor/github.com/golang-jwt/jwt/v5/ecdsa_utils.go @@ -23,7 +23,7 @@ func ParseECPrivateKeyFromPEM(key []byte) (*ecdsa.PrivateKey, error) { } // Parse the key - var parsedKey interface{} + var parsedKey any if parsedKey, err = x509.ParseECPrivateKey(block.Bytes); err != nil { if parsedKey, err = x509.ParsePKCS8PrivateKey(block.Bytes); err != nil { return nil, err @@ -50,7 +50,7 @@ func ParseECPublicKeyFromPEM(key []byte) (*ecdsa.PublicKey, error) { } // Parse the key - var parsedKey interface{} + var parsedKey any if parsedKey, err = x509.ParsePKIXPublicKey(block.Bytes); err != nil { if cert, err := x509.ParseCertificate(block.Bytes); err == nil { parsedKey = cert.PublicKey diff --git a/vendor/github.com/golang-jwt/jwt/v5/ed25519.go b/vendor/github.com/golang-jwt/jwt/v5/ed25519.go index c2138119e..4159e57bf 100644 --- a/vendor/github.com/golang-jwt/jwt/v5/ed25519.go +++ b/vendor/github.com/golang-jwt/jwt/v5/ed25519.go @@ -33,7 +33,7 @@ func (m *SigningMethodEd25519) Alg() string { // Verify implements token verification for the SigningMethod. // For this verify method, key must be an ed25519.PublicKey -func (m *SigningMethodEd25519) Verify(signingString string, sig []byte, key interface{}) error { +func (m *SigningMethodEd25519) Verify(signingString string, sig []byte, key any) error { var ed25519Key ed25519.PublicKey var ok bool @@ -55,7 +55,7 @@ func (m *SigningMethodEd25519) Verify(signingString string, sig []byte, key inte // Sign implements token signing for the SigningMethod. // For this signing method, key must be an ed25519.PrivateKey -func (m *SigningMethodEd25519) Sign(signingString string, key interface{}) ([]byte, error) { +func (m *SigningMethodEd25519) Sign(signingString string, key any) ([]byte, error) { var ed25519Key crypto.Signer var ok bool diff --git a/vendor/github.com/golang-jwt/jwt/v5/ed25519_utils.go b/vendor/github.com/golang-jwt/jwt/v5/ed25519_utils.go index cdb5e68e8..6f46e8860 100644 --- a/vendor/github.com/golang-jwt/jwt/v5/ed25519_utils.go +++ b/vendor/github.com/golang-jwt/jwt/v5/ed25519_utils.go @@ -24,7 +24,7 @@ func ParseEdPrivateKeyFromPEM(key []byte) (crypto.PrivateKey, error) { } // Parse the key - var parsedKey interface{} + var parsedKey any if parsedKey, err = x509.ParsePKCS8PrivateKey(block.Bytes); err != nil { return nil, err } @@ -49,7 +49,7 @@ func ParseEdPublicKeyFromPEM(key []byte) (crypto.PublicKey, error) { } // Parse the key - var parsedKey interface{} + var parsedKey any if parsedKey, err = x509.ParsePKIXPublicKey(block.Bytes); err != nil { return nil, err } diff --git a/vendor/github.com/golang-jwt/jwt/v5/hmac.go b/vendor/github.com/golang-jwt/jwt/v5/hmac.go index aca600ce1..1bef138c3 100644 --- a/vendor/github.com/golang-jwt/jwt/v5/hmac.go +++ b/vendor/github.com/golang-jwt/jwt/v5/hmac.go @@ -55,7 +55,7 @@ func (m *SigningMethodHMAC) Alg() string { // about this, and why we intentionally are not supporting string as a key can // be found on our usage guide // https://golang-jwt.github.io/jwt/usage/signing_methods/#signing-methods-and-key-types. -func (m *SigningMethodHMAC) Verify(signingString string, sig []byte, key interface{}) error { +func (m *SigningMethodHMAC) Verify(signingString string, sig []byte, key any) error { // Verify the key is the right type keyBytes, ok := key.([]byte) if !ok { @@ -88,7 +88,7 @@ func (m *SigningMethodHMAC) Verify(signingString string, sig []byte, key interfa // cryptographically random source, e.g. crypto/rand. Additional information // about this, and why we intentionally are not supporting string as a key can // be found on our usage guide https://golang-jwt.github.io/jwt/usage/signing_methods/. -func (m *SigningMethodHMAC) Sign(signingString string, key interface{}) ([]byte, error) { +func (m *SigningMethodHMAC) Sign(signingString string, key any) ([]byte, error) { if keyBytes, ok := key.([]byte); ok { if !m.Hash.Available() { return nil, ErrHashUnavailable diff --git a/vendor/github.com/golang-jwt/jwt/v5/map_claims.go b/vendor/github.com/golang-jwt/jwt/v5/map_claims.go index b2b51a1f8..3b9205272 100644 --- a/vendor/github.com/golang-jwt/jwt/v5/map_claims.go +++ b/vendor/github.com/golang-jwt/jwt/v5/map_claims.go @@ -5,9 +5,9 @@ import ( "fmt" ) -// MapClaims is a claims type that uses the map[string]interface{} for JSON +// MapClaims is a claims type that uses the map[string]any for JSON // decoding. This is the default claims type if you don't supply one -type MapClaims map[string]interface{} +type MapClaims map[string]any // GetExpirationTime implements the Claims interface. func (m MapClaims) GetExpirationTime() (*NumericDate, error) { @@ -73,7 +73,7 @@ func (m MapClaims) parseClaimsString(key string) (ClaimStrings, error) { cs = append(cs, v) case []string: cs = v - case []interface{}: + case []any: for _, a := range v { vs, ok := a.(string) if !ok { @@ -92,7 +92,7 @@ func (m MapClaims) parseClaimsString(key string) (ClaimStrings, error) { func (m MapClaims) parseString(key string) (string, error) { var ( ok bool - raw interface{} + raw any iss string ) raw, ok = m[key] diff --git a/vendor/github.com/golang-jwt/jwt/v5/none.go b/vendor/github.com/golang-jwt/jwt/v5/none.go index 685c2ea30..624ad55e8 100644 --- a/vendor/github.com/golang-jwt/jwt/v5/none.go +++ b/vendor/github.com/golang-jwt/jwt/v5/none.go @@ -25,7 +25,7 @@ func (m *signingMethodNone) Alg() string { } // Only allow 'none' alg type if UnsafeAllowNoneSignatureType is specified as the key -func (m *signingMethodNone) Verify(signingString string, sig []byte, key interface{}) (err error) { +func (m *signingMethodNone) Verify(signingString string, sig []byte, key any) (err error) { // Key must be UnsafeAllowNoneSignatureType to prevent accidentally // accepting 'none' signing method if _, ok := key.(unsafeNoneMagicConstant); !ok { @@ -41,7 +41,7 @@ func (m *signingMethodNone) Verify(signingString string, sig []byte, key interfa } // Only allow 'none' signing if UnsafeAllowNoneSignatureType is specified as the key -func (m *signingMethodNone) Sign(signingString string, key interface{}) ([]byte, error) { +func (m *signingMethodNone) Sign(signingString string, key any) ([]byte, error) { if _, ok := key.(unsafeNoneMagicConstant); ok { return []byte{}, nil } diff --git a/vendor/github.com/golang-jwt/jwt/v5/parser_option.go b/vendor/github.com/golang-jwt/jwt/v5/parser_option.go index 88a780fbd..431573557 100644 --- a/vendor/github.com/golang-jwt/jwt/v5/parser_option.go +++ b/vendor/github.com/golang-jwt/jwt/v5/parser_option.go @@ -66,20 +66,37 @@ func WithExpirationRequired() ParserOption { } } -// WithAudience configures the validator to require the specified audience in -// the `aud` claim. Validation will fail if the audience is not listed in the -// token or the `aud` claim is missing. +// WithAudience configures the validator to require any of the specified +// audiences in the `aud` claim. Validation will fail if the audience is not +// listed in the token or the `aud` claim is missing. // // NOTE: While the `aud` claim is OPTIONAL in a JWT, the handling of it is // application-specific. Since this validation API is helping developers in // writing secure application, we decided to REQUIRE the existence of the claim, // if an audience is expected. -func WithAudience(aud string) ParserOption { +func WithAudience(aud ...string) ParserOption { return func(p *Parser) { p.validator.expectedAud = aud } } +// WithAllAudiences configures the validator to require all the specified +// audiences in the `aud` claim. Validation will fail if the specified audiences +// are not listed in the token or the `aud` claim is missing. Duplicates within +// the list are de-duplicated since internally, we use a map to look up the +// audiences. +// +// NOTE: While the `aud` claim is OPTIONAL in a JWT, the handling of it is +// application-specific. Since this validation API is helping developers in +// writing secure application, we decided to REQUIRE the existence of the claim, +// if an audience is expected. +func WithAllAudiences(aud ...string) ParserOption { + return func(p *Parser) { + p.validator.expectedAud = aud + p.validator.expectAllAud = true + } +} + // WithIssuer configures the validator to require the specified issuer in the // `iss` claim. Validation will fail if a different issuer is specified in the // token or the `iss` claim is missing. diff --git a/vendor/github.com/golang-jwt/jwt/v5/rsa.go b/vendor/github.com/golang-jwt/jwt/v5/rsa.go index 83cbee6ae..98b960a78 100644 --- a/vendor/github.com/golang-jwt/jwt/v5/rsa.go +++ b/vendor/github.com/golang-jwt/jwt/v5/rsa.go @@ -46,7 +46,7 @@ func (m *SigningMethodRSA) Alg() string { // Verify implements token verification for the SigningMethod // For this signing method, must be an *rsa.PublicKey structure. -func (m *SigningMethodRSA) Verify(signingString string, sig []byte, key interface{}) error { +func (m *SigningMethodRSA) Verify(signingString string, sig []byte, key any) error { var rsaKey *rsa.PublicKey var ok bool @@ -67,7 +67,7 @@ func (m *SigningMethodRSA) Verify(signingString string, sig []byte, key interfac // Sign implements token signing for the SigningMethod // For this signing method, must be an *rsa.PrivateKey structure. -func (m *SigningMethodRSA) Sign(signingString string, key interface{}) ([]byte, error) { +func (m *SigningMethodRSA) Sign(signingString string, key any) ([]byte, error) { var rsaKey *rsa.PrivateKey var ok bool diff --git a/vendor/github.com/golang-jwt/jwt/v5/rsa_pss.go b/vendor/github.com/golang-jwt/jwt/v5/rsa_pss.go index 28c386ec4..7c216ae00 100644 --- a/vendor/github.com/golang-jwt/jwt/v5/rsa_pss.go +++ b/vendor/github.com/golang-jwt/jwt/v5/rsa_pss.go @@ -82,7 +82,7 @@ func init() { // Verify implements token verification for the SigningMethod. // For this verify method, key must be an rsa.PublicKey struct -func (m *SigningMethodRSAPSS) Verify(signingString string, sig []byte, key interface{}) error { +func (m *SigningMethodRSAPSS) Verify(signingString string, sig []byte, key any) error { var rsaKey *rsa.PublicKey switch k := key.(type) { case *rsa.PublicKey: @@ -108,7 +108,7 @@ func (m *SigningMethodRSAPSS) Verify(signingString string, sig []byte, key inter // Sign implements token signing for the SigningMethod. // For this signing method, key must be an rsa.PrivateKey struct -func (m *SigningMethodRSAPSS) Sign(signingString string, key interface{}) ([]byte, error) { +func (m *SigningMethodRSAPSS) Sign(signingString string, key any) ([]byte, error) { var rsaKey *rsa.PrivateKey switch k := key.(type) { diff --git a/vendor/github.com/golang-jwt/jwt/v5/rsa_utils.go b/vendor/github.com/golang-jwt/jwt/v5/rsa_utils.go index b3aeebbe1..f22c3d068 100644 --- a/vendor/github.com/golang-jwt/jwt/v5/rsa_utils.go +++ b/vendor/github.com/golang-jwt/jwt/v5/rsa_utils.go @@ -23,7 +23,7 @@ func ParseRSAPrivateKeyFromPEM(key []byte) (*rsa.PrivateKey, error) { return nil, ErrKeyMustBePEMEncoded } - var parsedKey interface{} + var parsedKey any if parsedKey, err = x509.ParsePKCS1PrivateKey(block.Bytes); err != nil { if parsedKey, err = x509.ParsePKCS8PrivateKey(block.Bytes); err != nil { return nil, err @@ -53,7 +53,7 @@ func ParseRSAPrivateKeyFromPEMWithPassword(key []byte, password string) (*rsa.Pr return nil, ErrKeyMustBePEMEncoded } - var parsedKey interface{} + var parsedKey any var blockDecrypted []byte if blockDecrypted, err = x509.DecryptPEMBlock(block, []byte(password)); err != nil { @@ -86,7 +86,7 @@ func ParseRSAPublicKeyFromPEM(key []byte) (*rsa.PublicKey, error) { } // Parse the key - var parsedKey interface{} + var parsedKey any if parsedKey, err = x509.ParsePKIXPublicKey(block.Bytes); err != nil { if cert, err := x509.ParseCertificate(block.Bytes); err == nil { parsedKey = cert.PublicKey diff --git a/vendor/github.com/golang-jwt/jwt/v5/signing_method.go b/vendor/github.com/golang-jwt/jwt/v5/signing_method.go index 0d73631c1..096d0ed4c 100644 --- a/vendor/github.com/golang-jwt/jwt/v5/signing_method.go +++ b/vendor/github.com/golang-jwt/jwt/v5/signing_method.go @@ -12,9 +12,9 @@ var signingMethodLock = new(sync.RWMutex) // signature in Sign. The signature is then usually base64 encoded as part of a // JWT. type SigningMethod interface { - Verify(signingString string, sig []byte, key interface{}) error // Returns nil if signature is valid - Sign(signingString string, key interface{}) ([]byte, error) // Returns signature or error - Alg() string // returns the alg identifier for this method (example: 'HS256') + Verify(signingString string, sig []byte, key any) error // Returns nil if signature is valid + Sign(signingString string, key any) ([]byte, error) // Returns signature or error + Alg() string // returns the alg identifier for this method (example: 'HS256') } // RegisterSigningMethod registers the "alg" name and a factory function for signing method. diff --git a/vendor/github.com/golang-jwt/jwt/v5/token.go b/vendor/github.com/golang-jwt/jwt/v5/token.go index 9c7f4ab01..3f7155888 100644 --- a/vendor/github.com/golang-jwt/jwt/v5/token.go +++ b/vendor/github.com/golang-jwt/jwt/v5/token.go @@ -11,9 +11,9 @@ import ( // Token. This allows you to use properties in the Header of the token (such as // `kid`) to identify which key to use. // -// The returned interface{} may be a single key or a VerificationKeySet containing +// The returned any may be a single key or a VerificationKeySet containing // multiple keys. -type Keyfunc func(*Token) (interface{}, error) +type Keyfunc func(*Token) (any, error) // VerificationKey represents a public or secret key for verifying a token's signature. type VerificationKey interface { @@ -28,12 +28,12 @@ type VerificationKeySet struct { // Token represents a JWT Token. Different fields will be used depending on // whether you're creating or parsing/verifying a token. type Token struct { - Raw string // Raw contains the raw token. Populated when you [Parse] a token - Method SigningMethod // Method is the signing method used or to be used - Header map[string]interface{} // Header is the first segment of the token in decoded form - Claims Claims // Claims is the second segment of the token in decoded form - Signature []byte // Signature is the third segment of the token in decoded form. Populated when you Parse a token - Valid bool // Valid specifies if the token is valid. Populated when you Parse/Verify a token + Raw string // Raw contains the raw token. Populated when you [Parse] a token + Method SigningMethod // Method is the signing method used or to be used + Header map[string]any // Header is the first segment of the token in decoded form + Claims Claims // Claims is the second segment of the token in decoded form + Signature []byte // Signature is the third segment of the token in decoded form. Populated when you Parse a token + Valid bool // Valid specifies if the token is valid. Populated when you Parse/Verify a token } // New creates a new [Token] with the specified signing method and an empty map @@ -46,7 +46,7 @@ func New(method SigningMethod, opts ...TokenOption) *Token { // claims. Additional options can be specified, but are currently unused. func NewWithClaims(method SigningMethod, claims Claims, opts ...TokenOption) *Token { return &Token{ - Header: map[string]interface{}{ + Header: map[string]any{ "typ": "JWT", "alg": method.Alg(), }, @@ -60,7 +60,7 @@ func NewWithClaims(method SigningMethod, claims Claims, opts ...TokenOption) *To // https://golang-jwt.github.io/jwt/usage/signing_methods/#signing-methods-and-key-types // for an overview of the different signing methods and their respective key // types. -func (t *Token) SignedString(key interface{}) (string, error) { +func (t *Token) SignedString(key any) (string, error) { sstr, err := t.SigningString() if err != nil { return "", err diff --git a/vendor/github.com/golang-jwt/jwt/v5/types.go b/vendor/github.com/golang-jwt/jwt/v5/types.go index b2655a9e6..a3e0ef121 100644 --- a/vendor/github.com/golang-jwt/jwt/v5/types.go +++ b/vendor/github.com/golang-jwt/jwt/v5/types.go @@ -103,7 +103,7 @@ func (date *NumericDate) UnmarshalJSON(b []byte) (err error) { type ClaimStrings []string func (s *ClaimStrings) UnmarshalJSON(data []byte) (err error) { - var value interface{} + var value any if err = json.Unmarshal(data, &value); err != nil { return err @@ -116,7 +116,7 @@ func (s *ClaimStrings) UnmarshalJSON(data []byte) (err error) { aud = append(aud, v) case []string: aud = ClaimStrings(v) - case []interface{}: + case []any: for _, vv := range v { vs, ok := vv.(string) if !ok { diff --git a/vendor/github.com/golang-jwt/jwt/v5/validator.go b/vendor/github.com/golang-jwt/jwt/v5/validator.go index 008ecd871..92b5c057c 100644 --- a/vendor/github.com/golang-jwt/jwt/v5/validator.go +++ b/vendor/github.com/golang-jwt/jwt/v5/validator.go @@ -1,8 +1,8 @@ package jwt import ( - "crypto/subtle" "fmt" + "slices" "time" ) @@ -52,8 +52,12 @@ type Validator struct { verifyIat bool // expectedAud contains the audience this token expects. Supplying an empty - // string will disable aud checking. - expectedAud string + // slice will disable aud checking. + expectedAud []string + + // expectAllAud specifies whether all expected audiences must be present in + // the token. If false, only one of the expected audiences must be present. + expectAllAud bool // expectedIss contains the issuer this token expects. Supplying an empty // string will disable iss checking. @@ -88,7 +92,7 @@ func NewValidator(opts ...ParserOption) *Validator { func (v *Validator) Validate(claims Claims) error { var ( now time.Time - errs []error = make([]error, 0, 6) + errs = make([]error, 0, 6) err error ) @@ -120,8 +124,8 @@ func (v *Validator) Validate(claims Claims) error { } // If we have an expected audience, we also require the audience claim - if v.expectedAud != "" { - if err = v.verifyAudience(claims, v.expectedAud, true); err != nil { + if len(v.expectedAud) > 0 { + if err = v.verifyAudience(claims, v.expectedAud, v.expectAllAud); err != nil { errs = append(errs, err) } } @@ -226,33 +230,39 @@ func (v *Validator) verifyNotBefore(claims Claims, cmp time.Time, required bool) // // Additionally, if any error occurs while retrieving the claim, e.g., when its // the wrong type, an ErrTokenUnverifiable error will be returned. -func (v *Validator) verifyAudience(claims Claims, cmp string, required bool) error { +func (v *Validator) verifyAudience(claims Claims, cmp []string, expectAllAud bool) error { aud, err := claims.GetAudience() if err != nil { return err } - if len(aud) == 0 { + // Check that aud exists and is not empty. We only require the aud claim + // if we expect at least one audience to be present. + if len(aud) == 0 || len(aud) == 1 && aud[0] == "" { + required := len(v.expectedAud) > 0 return errorIfRequired(required, "aud") } - // use a var here to keep constant time compare when looping over a number of claims - result := false - - var stringClaims string - for _, a := range aud { - if subtle.ConstantTimeCompare([]byte(a), []byte(cmp)) != 0 { - result = true + if !expectAllAud { + for _, a := range aud { + // If we only expect one match, we can stop early if we find a match + if slices.Contains(cmp, a) { + return nil + } } - stringClaims = stringClaims + a + + return ErrTokenInvalidAudience } - // case where "" is sent in one or many aud claims - if stringClaims == "" { - return errorIfRequired(required, "aud") + // Note that we are looping cmp here to ensure that all expected audiences + // are present in the aud claim. + for _, a := range cmp { + if !slices.Contains(aud, a) { + return ErrTokenInvalidAudience + } } - return errorIfFalse(result, ErrTokenInvalidAudience) + return nil } // verifyIssuer compares the iss claim in claims against cmp. diff --git a/vendor/github.com/opencloud-eu/reva/v2/internal/http/interceptors/log/log.go b/vendor/github.com/opencloud-eu/reva/v2/internal/http/interceptors/log/log.go index 04d10106e..a1a04484a 100644 --- a/vendor/github.com/opencloud-eu/reva/v2/internal/http/interceptors/log/log.go +++ b/vendor/github.com/opencloud-eu/reva/v2/internal/http/interceptors/log/log.go @@ -130,6 +130,7 @@ type commonLoggingResponseWriter interface { http.Flusher Status() int Size() int + Unwrap() http.ResponseWriter } // responseLogger is wrapper of http.ResponseWriter that keeps track of its HTTP @@ -170,6 +171,10 @@ func (l *responseLogger) Flush() { } } +func (l *responseLogger) Unwrap() http.ResponseWriter { + return l.w +} + type hijackLogger struct { responseLogger } diff --git a/vendor/github.com/opencloud-eu/reva/v2/pkg/events/raw/mocks/Stream.go b/vendor/github.com/opencloud-eu/reva/v2/pkg/events/raw/mocks/Stream.go index b83bb7b6b..11a2a6174 100644 --- a/vendor/github.com/opencloud-eu/reva/v2/pkg/events/raw/mocks/Stream.go +++ b/vendor/github.com/opencloud-eu/reva/v2/pkg/events/raw/mocks/Stream.go @@ -21,7 +21,9 @@ package mocks import ( + jetstream "github.com/nats-io/nats.go/jetstream" events "github.com/opencloud-eu/reva/v2/pkg/events" + mock "github.com/stretchr/testify/mock" raw "github.com/opencloud-eu/reva/v2/pkg/events/raw" @@ -112,6 +114,53 @@ func (_c *Stream_Consume_Call) RunAndReturn(run func(string, ...events.Unmarshal return _c } +// JetStream provides a mock function with no fields +func (_m *Stream) JetStream() jetstream.Stream { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for JetStream") + } + + var r0 jetstream.Stream + if rf, ok := ret.Get(0).(func() jetstream.Stream); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(jetstream.Stream) + } + } + + return r0 +} + +// Stream_JetStream_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'JetStream' +type Stream_JetStream_Call struct { + *mock.Call +} + +// JetStream is a helper method to define mock.On call +func (_e *Stream_Expecter) JetStream() *Stream_JetStream_Call { + return &Stream_JetStream_Call{Call: _e.mock.On("JetStream")} +} + +func (_c *Stream_JetStream_Call) Run(run func()) *Stream_JetStream_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *Stream_JetStream_Call) Return(_a0 jetstream.Stream) *Stream_JetStream_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *Stream_JetStream_Call) RunAndReturn(run func() jetstream.Stream) *Stream_JetStream_Call { + _c.Call.Return(run) + return _c +} + // NewStream creates a new instance of Stream. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. // The first argument is typically a *testing.T value. func NewStream(t interface { diff --git a/vendor/github.com/opencloud-eu/reva/v2/pkg/events/raw/raw.go b/vendor/github.com/opencloud-eu/reva/v2/pkg/events/raw/raw.go index 1d74d8105..5f24f45c1 100644 --- a/vendor/github.com/opencloud-eu/reva/v2/pkg/events/raw/raw.go +++ b/vendor/github.com/opencloud-eu/reva/v2/pkg/events/raw/raw.go @@ -62,10 +62,11 @@ func (re *Event) InProgress() error { type Stream interface { Consume(group string, evs ...events.Unmarshaller) (<-chan Event, error) + JetStream() jetstream.Stream } type RawStream struct { - Js jetstream.Stream + js jetstream.Stream c Config } @@ -130,7 +131,7 @@ func FromConfig(ctx context.Context, name string, cfg Config) (Stream, error) { } s = &RawStream{ - Js: js, + js: js, c: cfg, } return nil @@ -186,7 +187,7 @@ func (s *RawStream) Consume(group string, evs ...events.Unmarshaller) (<-chan Ev } func (s *RawStream) consumeRaw(group string) (<-chan RawEvent, error) { - consumer, err := s.Js.CreateOrUpdateConsumer(context.Background(), jetstream.ConsumerConfig{ + consumer, err := s.js.CreateOrUpdateConsumer(context.Background(), jetstream.ConsumerConfig{ Durable: group, DeliverPolicy: jetstream.DeliverNewPolicy, AckPolicy: jetstream.AckExplicitPolicy, // Require manual acknowledgment @@ -214,3 +215,7 @@ func (s *RawStream) consumeRaw(group string) (<-chan RawEvent, error) { return channel, nil } + +func (s *RawStream) JetStream() jetstream.Stream { + return s.js +} diff --git a/vendor/google.golang.org/grpc/balancer/balancer.go b/vendor/google.golang.org/grpc/balancer/balancer.go index c9b343c71..b1264017d 100644 --- a/vendor/google.golang.org/grpc/balancer/balancer.go +++ b/vendor/google.golang.org/grpc/balancer/balancer.go @@ -360,6 +360,10 @@ type Balancer interface { // call SubConn.Shutdown for its existing SubConns; however, this will be // required in a future release, so it is recommended. Close() + // ExitIdle instructs the LB policy to reconnect to backends / exit the + // IDLE state, if appropriate and possible. Note that SubConns that enter + // the IDLE state will not reconnect until SubConn.Connect is called. + ExitIdle() } // ExitIdler is an optional interface for balancers to implement. If @@ -367,8 +371,8 @@ type Balancer interface { // the ClientConn is idle. If unimplemented, ClientConn.Connect will cause // all SubConns to connect. // -// Notice: it will be required for all balancers to implement this in a future -// release. +// Deprecated: All balancers must implement this interface. This interface will +// be removed in a future release. type ExitIdler interface { // ExitIdle instructs the LB policy to reconnect to backends / exit the // IDLE state, if appropriate and possible. Note that SubConns that enter diff --git a/vendor/google.golang.org/grpc/balancer/endpointsharding/endpointsharding.go b/vendor/google.golang.org/grpc/balancer/endpointsharding/endpointsharding.go index cc606f4da..0ad6bb1f2 100644 --- a/vendor/google.golang.org/grpc/balancer/endpointsharding/endpointsharding.go +++ b/vendor/google.golang.org/grpc/balancer/endpointsharding/endpointsharding.go @@ -45,7 +45,15 @@ type ChildState struct { // Balancer exposes only the ExitIdler interface of the child LB policy. // Other methods of the child policy are called only by endpointsharding. - Balancer balancer.ExitIdler + Balancer ExitIdler +} + +// ExitIdler provides access to only the ExitIdle method of the child balancer. +type ExitIdler interface { + // ExitIdle instructs the LB policy to reconnect to backends / exit the + // IDLE state, if appropriate and possible. Note that SubConns that enter + // the IDLE state will not reconnect until SubConn.Connect is called. + ExitIdle() } // Options are the options to configure the behaviour of the @@ -205,6 +213,16 @@ func (es *endpointSharding) Close() { } } +func (es *endpointSharding) ExitIdle() { + es.childMu.Lock() + defer es.childMu.Unlock() + for _, bw := range es.children.Load().Values() { + if !bw.isClosed { + bw.child.ExitIdle() + } + } +} + // updateState updates this component's state. It sends the aggregated state, // and a picker with round robin behavior with all the child states present if // needed. @@ -326,15 +344,13 @@ func (bw *balancerWrapper) UpdateState(state balancer.State) { // ExitIdle pings an IDLE child balancer to exit idle in a new goroutine to // avoid deadlocks due to synchronous balancer state updates. func (bw *balancerWrapper) ExitIdle() { - if ei, ok := bw.child.(balancer.ExitIdler); ok { - go func() { - bw.es.childMu.Lock() - if !bw.isClosed { - ei.ExitIdle() - } - bw.es.childMu.Unlock() - }() - } + go func() { + bw.es.childMu.Lock() + if !bw.isClosed { + bw.child.ExitIdle() + } + bw.es.childMu.Unlock() + }() } // updateClientConnStateLocked delivers the ClientConnState to the child diff --git a/vendor/google.golang.org/grpc/balancer/pickfirst/pickfirstleaf/pickfirstleaf.go b/vendor/google.golang.org/grpc/balancer/pickfirst/pickfirstleaf/pickfirstleaf.go index 494314f23..e62047256 100644 --- a/vendor/google.golang.org/grpc/balancer/pickfirst/pickfirstleaf/pickfirstleaf.go +++ b/vendor/google.golang.org/grpc/balancer/pickfirst/pickfirstleaf/pickfirstleaf.go @@ -54,18 +54,9 @@ func init() { balancer.Register(pickfirstBuilder{}) } -type ( - // enableHealthListenerKeyType is a unique key type used in resolver - // attributes to indicate whether the health listener usage is enabled. - enableHealthListenerKeyType struct{} - // managedByPickfirstKeyType is an attribute key type to inform Outlier - // Detection that the generic health listener is being used. - // TODO: https://github.com/grpc/grpc-go/issues/7915 - Remove this when - // implementing the dualstack design. This is a hack. Once Dualstack is - // completed, outlier detection will stop sending ejection updates through - // the connectivity listener. - managedByPickfirstKeyType struct{} -) +// enableHealthListenerKeyType is a unique key type used in resolver +// attributes to indicate whether the health listener usage is enabled. +type enableHealthListenerKeyType struct{} var ( logger = grpclog.Component("pick-first-leaf-lb") @@ -149,17 +140,6 @@ func EnableHealthListener(state resolver.State) resolver.State { return state } -// IsManagedByPickfirst returns whether an address belongs to a SubConn -// managed by the pickfirst LB policy. -// TODO: https://github.com/grpc/grpc-go/issues/7915 - This is a hack to disable -// outlier_detection via the with connectivity listener when using pick_first. -// Once Dualstack changes are complete, all SubConns will be created by -// pick_first and outlier detection will only use the health listener for -// ejection. This hack can then be removed. -func IsManagedByPickfirst(addr resolver.Address) bool { - return addr.BalancerAttributes.Value(managedByPickfirstKeyType{}) != nil -} - type pfConfig struct { serviceconfig.LoadBalancingConfig `json:"-"` @@ -186,7 +166,6 @@ type scData struct { } func (b *pickfirstBalancer) newSCData(addr resolver.Address) (*scData, error) { - addr.BalancerAttributes = addr.BalancerAttributes.WithValue(managedByPickfirstKeyType{}, true) sd := &scData{ rawConnectivityState: connectivity.Idle, effectiveState: connectivity.Idle, diff --git a/vendor/google.golang.org/grpc/balancer/roundrobin/roundrobin.go b/vendor/google.golang.org/grpc/balancer/roundrobin/roundrobin.go index 35da5d1ec..22045bf39 100644 --- a/vendor/google.golang.org/grpc/balancer/roundrobin/roundrobin.go +++ b/vendor/google.golang.org/grpc/balancer/roundrobin/roundrobin.go @@ -70,10 +70,3 @@ func (b *rrBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error { ResolverState: pickfirstleaf.EnableHealthListener(ccs.ResolverState), }) } - -func (b *rrBalancer) ExitIdle() { - // Should always be ok, as child is endpoint sharding. - if ei, ok := b.Balancer.(balancer.ExitIdler); ok { - ei.ExitIdle() - } -} diff --git a/vendor/google.golang.org/grpc/dialoptions.go b/vendor/google.golang.org/grpc/dialoptions.go index 050ba0f16..ec0ca89cc 100644 --- a/vendor/google.golang.org/grpc/dialoptions.go +++ b/vendor/google.golang.org/grpc/dialoptions.go @@ -213,6 +213,7 @@ func WithReadBufferSize(s int) DialOption { func WithInitialWindowSize(s int32) DialOption { return newFuncDialOption(func(o *dialOptions) { o.copts.InitialWindowSize = s + o.copts.StaticWindowSize = true }) } @@ -222,6 +223,26 @@ func WithInitialWindowSize(s int32) DialOption { func WithInitialConnWindowSize(s int32) DialOption { return newFuncDialOption(func(o *dialOptions) { o.copts.InitialConnWindowSize = s + o.copts.StaticWindowSize = true + }) +} + +// WithStaticStreamWindowSize returns a DialOption which sets the initial +// stream window size to the value provided and disables dynamic flow control. +func WithStaticStreamWindowSize(s int32) DialOption { + return newFuncDialOption(func(o *dialOptions) { + o.copts.InitialWindowSize = s + o.copts.StaticWindowSize = true + }) +} + +// WithStaticConnWindowSize returns a DialOption which sets the initial +// connection window size to the value provided and disables dynamic flow +// control. +func WithStaticConnWindowSize(s int32) DialOption { + return newFuncDialOption(func(o *dialOptions) { + o.copts.InitialConnWindowSize = s + o.copts.StaticWindowSize = true }) } diff --git a/vendor/google.golang.org/grpc/health/grpc_health_v1/health_grpc.pb.go b/vendor/google.golang.org/grpc/health/grpc_health_v1/health_grpc.pb.go index 93136610e..f2c01f296 100644 --- a/vendor/google.golang.org/grpc/health/grpc_health_v1/health_grpc.pb.go +++ b/vendor/google.golang.org/grpc/health/grpc_health_v1/health_grpc.pb.go @@ -188,13 +188,13 @@ type HealthServer interface { type UnimplementedHealthServer struct{} func (UnimplementedHealthServer) Check(context.Context, *HealthCheckRequest) (*HealthCheckResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method Check not implemented") + return nil, status.Error(codes.Unimplemented, "method Check not implemented") } func (UnimplementedHealthServer) List(context.Context, *HealthListRequest) (*HealthListResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method List not implemented") + return nil, status.Error(codes.Unimplemented, "method List not implemented") } func (UnimplementedHealthServer) Watch(*HealthCheckRequest, grpc.ServerStreamingServer[HealthCheckResponse]) error { - return status.Errorf(codes.Unimplemented, "method Watch not implemented") + return status.Error(codes.Unimplemented, "method Watch not implemented") } func (UnimplementedHealthServer) testEmbeddedByValue() {} diff --git a/vendor/google.golang.org/grpc/internal/balancer/gracefulswitch/gracefulswitch.go b/vendor/google.golang.org/grpc/internal/balancer/gracefulswitch/gracefulswitch.go index fbc1ca356..ba25b8988 100644 --- a/vendor/google.golang.org/grpc/internal/balancer/gracefulswitch/gracefulswitch.go +++ b/vendor/google.golang.org/grpc/internal/balancer/gracefulswitch/gracefulswitch.go @@ -223,15 +223,7 @@ func (gsb *Balancer) ExitIdle() { // There is no need to protect this read with a mutex, as the write to the // Balancer field happens in SwitchTo, which completes before this can be // called. - if ei, ok := balToUpdate.Balancer.(balancer.ExitIdler); ok { - ei.ExitIdle() - return - } - gsb.mu.Lock() - defer gsb.mu.Unlock() - for sc := range balToUpdate.subconns { - sc.Connect() - } + balToUpdate.ExitIdle() } // updateSubConnState forwards the update to the appropriate child. diff --git a/vendor/google.golang.org/grpc/internal/envconfig/envconfig.go b/vendor/google.golang.org/grpc/internal/envconfig/envconfig.go index f5f2bdeb8..2fdaed88d 100644 --- a/vendor/google.golang.org/grpc/internal/envconfig/envconfig.go +++ b/vendor/google.golang.org/grpc/internal/envconfig/envconfig.go @@ -33,10 +33,6 @@ var ( // "GRPC_RING_HASH_CAP". This does not override the default bounds // checking which NACKs configs specifying ring sizes > 8*1024*1024 (~8M). RingHashCap = uint64FromEnv("GRPC_RING_HASH_CAP", 4096, 1, 8*1024*1024) - // LeastRequestLB is set if we should support the least_request_experimental - // LB policy, which can be enabled by setting the environment variable - // "GRPC_EXPERIMENTAL_ENABLE_LEAST_REQUEST" to "true". - LeastRequestLB = boolFromEnv("GRPC_EXPERIMENTAL_ENABLE_LEAST_REQUEST", true) // ALTSMaxConcurrentHandshakes is the maximum number of concurrent ALTS // handshakes that can be performed. ALTSMaxConcurrentHandshakes = uint64FromEnv("GRPC_ALTS_MAX_CONCURRENT_HANDSHAKES", 100, 1, 100) diff --git a/vendor/google.golang.org/grpc/internal/transport/controlbuf.go b/vendor/google.golang.org/grpc/internal/transport/controlbuf.go index ef72fbb3a..a2831e5d0 100644 --- a/vendor/google.golang.org/grpc/internal/transport/controlbuf.go +++ b/vendor/google.golang.org/grpc/internal/transport/controlbuf.go @@ -40,6 +40,13 @@ var updateHeaderTblSize = func(e *hpack.Encoder, v uint32) { e.SetMaxDynamicTableSizeLimit(v) } +// itemNodePool is used to reduce heap allocations. +var itemNodePool = sync.Pool{ + New: func() any { + return &itemNode{} + }, +} + type itemNode struct { it any next *itemNode @@ -51,7 +58,9 @@ type itemList struct { } func (il *itemList) enqueue(i any) { - n := &itemNode{it: i} + n := itemNodePool.Get().(*itemNode) + n.next = nil + n.it = i if il.tail == nil { il.head, il.tail = n, n return @@ -71,7 +80,9 @@ func (il *itemList) dequeue() any { return nil } i := il.head.it + temp := il.head il.head = il.head.next + itemNodePool.Put(temp) if il.head == nil { il.tail = nil } @@ -146,10 +157,11 @@ type earlyAbortStream struct { func (*earlyAbortStream) isTransportResponseFrame() bool { return false } type dataFrame struct { - streamID uint32 - endStream bool - h []byte - reader mem.Reader + streamID uint32 + endStream bool + h []byte + data mem.BufferSlice + processing bool // onEachWrite is called every time // a part of data is written out. onEachWrite func() @@ -234,6 +246,7 @@ type outStream struct { itl *itemList bytesOutStanding int wq *writeQuota + reader mem.Reader next *outStream prev *outStream @@ -461,7 +474,9 @@ func (c *controlBuffer) finish() { v.onOrphaned(ErrConnClosing) } case *dataFrame: - _ = v.reader.Close() + if !v.processing { + v.data.Free() + } } } @@ -650,10 +665,11 @@ func (l *loopyWriter) incomingSettingsHandler(s *incomingSettings) error { func (l *loopyWriter) registerStreamHandler(h *registerStream) { str := &outStream{ - id: h.streamID, - state: empty, - itl: &itemList{}, - wq: h.wq, + id: h.streamID, + state: empty, + itl: &itemList{}, + wq: h.wq, + reader: mem.BufferSlice{}.Reader(), } l.estdStreams[h.streamID] = str } @@ -685,10 +701,11 @@ func (l *loopyWriter) headerHandler(h *headerFrame) error { } // Case 2: Client wants to originate stream. str := &outStream{ - id: h.streamID, - state: empty, - itl: &itemList{}, - wq: h.wq, + id: h.streamID, + state: empty, + itl: &itemList{}, + wq: h.wq, + reader: mem.BufferSlice{}.Reader(), } return l.originateStream(str, h) } @@ -790,10 +807,13 @@ func (l *loopyWriter) cleanupStreamHandler(c *cleanupStream) error { // a RST_STREAM before stream initialization thus the stream might // not be established yet. delete(l.estdStreams, c.streamID) + str.reader.Close() str.deleteSelf() for head := str.itl.dequeueAll(); head != nil; head = head.next { if df, ok := head.it.(*dataFrame); ok { - _ = df.reader.Close() + if !df.processing { + df.data.Free() + } } } } @@ -928,7 +948,13 @@ func (l *loopyWriter) processData() (bool, error) { if str == nil { return true, nil } + reader := str.reader dataItem := str.itl.peek().(*dataFrame) // Peek at the first data item this stream. + if !dataItem.processing { + dataItem.processing = true + str.reader.Reset(dataItem.data) + dataItem.data.Free() + } // A data item is represented by a dataFrame, since it later translates into // multiple HTTP2 data frames. // Every dataFrame has two buffers; h that keeps grpc-message header and data @@ -936,13 +962,13 @@ func (l *loopyWriter) processData() (bool, error) { // from data is copied to h to make as big as the maximum possible HTTP2 frame // size. - if len(dataItem.h) == 0 && dataItem.reader.Remaining() == 0 { // Empty data frame + if len(dataItem.h) == 0 && reader.Remaining() == 0 { // Empty data frame // Client sends out empty data frame with endStream = true if err := l.framer.fr.WriteData(dataItem.streamID, dataItem.endStream, nil); err != nil { return false, err } str.itl.dequeue() // remove the empty data item from stream - _ = dataItem.reader.Close() + _ = reader.Close() if str.itl.isEmpty() { str.state = empty } else if trailer, ok := str.itl.peek().(*headerFrame); ok { // the next item is trailers. @@ -971,8 +997,8 @@ func (l *loopyWriter) processData() (bool, error) { } // Compute how much of the header and data we can send within quota and max frame length hSize := min(maxSize, len(dataItem.h)) - dSize := min(maxSize-hSize, dataItem.reader.Remaining()) - remainingBytes := len(dataItem.h) + dataItem.reader.Remaining() - hSize - dSize + dSize := min(maxSize-hSize, reader.Remaining()) + remainingBytes := len(dataItem.h) + reader.Remaining() - hSize - dSize size := hSize + dSize var buf *[]byte @@ -993,7 +1019,7 @@ func (l *loopyWriter) processData() (bool, error) { defer pool.Put(buf) copy((*buf)[:hSize], dataItem.h) - _, _ = dataItem.reader.Read((*buf)[hSize:]) + _, _ = reader.Read((*buf)[hSize:]) } // Now that outgoing flow controls are checked we can replenish str's write quota @@ -1014,7 +1040,7 @@ func (l *loopyWriter) processData() (bool, error) { dataItem.h = dataItem.h[hSize:] if remainingBytes == 0 { // All the data from that message was written out. - _ = dataItem.reader.Close() + _ = reader.Close() str.itl.dequeue() } if str.itl.isEmpty() { diff --git a/vendor/google.golang.org/grpc/internal/transport/http2_client.go b/vendor/google.golang.org/grpc/internal/transport/http2_client.go index ef56592b9..5467fe971 100644 --- a/vendor/google.golang.org/grpc/internal/transport/http2_client.go +++ b/vendor/google.golang.org/grpc/internal/transport/http2_client.go @@ -309,11 +309,9 @@ func NewHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts scheme = "https" } } - dynamicWindow := true icwz := int32(initialWindowSize) if opts.InitialConnWindowSize >= defaultWindowSize { icwz = opts.InitialConnWindowSize - dynamicWindow = false } writeBufSize := opts.WriteBufferSize readBufSize := opts.ReadBufferSize @@ -381,9 +379,8 @@ func NewHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts t.controlBuf = newControlBuffer(t.ctxDone) if opts.InitialWindowSize >= defaultWindowSize { t.initialWindowSize = opts.InitialWindowSize - dynamicWindow = false } - if dynamicWindow { + if !opts.StaticWindowSize { t.bdpEst = &bdpEstimator{ bdp: initialWindowSize, updateFlowControl: t.updateFlowControl, @@ -1091,32 +1088,29 @@ func (t *http2Client) GracefulClose() { // Write formats the data into HTTP2 data frame(s) and sends it out. The caller // should proceed only if Write returns nil. func (t *http2Client) write(s *ClientStream, hdr []byte, data mem.BufferSlice, opts *WriteOptions) error { - reader := data.Reader() - if opts.Last { // If it's the last message, update stream state. if !s.compareAndSwapState(streamActive, streamWriteDone) { - _ = reader.Close() return errStreamDone } } else if s.getState() != streamActive { - _ = reader.Close() return errStreamDone } df := &dataFrame{ streamID: s.id, endStream: opts.Last, h: hdr, - reader: reader, + data: data, } - if hdr != nil || df.reader.Remaining() != 0 { // If it's not an empty data frame, check quota. - if err := s.wq.get(int32(len(hdr) + df.reader.Remaining())); err != nil { - _ = reader.Close() + dataLen := data.Len() + if hdr != nil || dataLen != 0 { // If it's not an empty data frame, check quota. + if err := s.wq.get(int32(len(hdr) + dataLen)); err != nil { return err } } + data.Ref() if err := t.controlBuf.put(df); err != nil { - _ = reader.Close() + data.Free() return err } t.incrMsgSent() diff --git a/vendor/google.golang.org/grpc/internal/transport/http2_server.go b/vendor/google.golang.org/grpc/internal/transport/http2_server.go index e4c3731bd..9f725e15a 100644 --- a/vendor/google.golang.org/grpc/internal/transport/http2_server.go +++ b/vendor/google.golang.org/grpc/internal/transport/http2_server.go @@ -132,6 +132,10 @@ type http2Server struct { maxStreamID uint32 // max stream ID ever seen logger *grpclog.PrefixLogger + // setResetPingStrikes is stored as a closure instead of making this a + // method on http2Server to avoid a heap allocation when converting a method + // to a closure for passing to frames objects. + setResetPingStrikes func() } // NewServerTransport creates a http2 transport with conn and configuration @@ -176,16 +180,13 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport, Val: config.MaxStreams, }) } - dynamicWindow := true iwz := int32(initialWindowSize) if config.InitialWindowSize >= defaultWindowSize { iwz = config.InitialWindowSize - dynamicWindow = false } icwz := int32(initialWindowSize) if config.InitialConnWindowSize >= defaultWindowSize { icwz = config.InitialConnWindowSize - dynamicWindow = false } if iwz != defaultWindowSize { isettings = append(isettings, http2.Setting{ @@ -266,6 +267,9 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport, initialWindowSize: iwz, bufferPool: config.BufferPool, } + t.setResetPingStrikes = func() { + atomic.StoreUint32(&t.resetPingStrikes, 1) + } var czSecurity credentials.ChannelzSecurityValue if au, ok := authInfo.(credentials.ChannelzSecurityInfo); ok { czSecurity = au.GetSecurityValue() @@ -285,7 +289,7 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport, t.logger = prefixLoggerForServerTransport(t) t.controlBuf = newControlBuffer(t.done) - if dynamicWindow { + if !config.StaticWindowSize { t.bdpEst = &bdpEstimator{ bdp: initialWindowSize, updateFlowControl: t.updateFlowControl, @@ -596,10 +600,25 @@ func (t *http2Server) operateHeaders(ctx context.Context, frame *http2.MetaHeade return nil } } + + if s.ctx.Err() != nil { + t.mu.Unlock() + // Early abort in case the timeout was zero or so low it already fired. + t.controlBuf.put(&earlyAbortStream{ + httpStatus: http.StatusOK, + streamID: s.id, + contentSubtype: s.contentSubtype, + status: status.New(codes.DeadlineExceeded, context.DeadlineExceeded.Error()), + rst: !frame.StreamEnded(), + }) + return nil + } + t.activeStreams[streamID] = s if len(t.activeStreams) == 1 { t.idle = time.Time{} } + // Start a timer to close the stream on reaching the deadline. if timeoutSet { // We need to wait for s.cancel to be updated before calling @@ -1016,10 +1035,6 @@ func (t *http2Server) writeHeader(s *ServerStream, md metadata.MD) error { return nil } -func (t *http2Server) setResetPingStrikes() { - atomic.StoreUint32(&t.resetPingStrikes, 1) -} - func (t *http2Server) writeHeaderLocked(s *ServerStream) error { // TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields // first and create a slice of that exact size. @@ -1132,17 +1147,13 @@ func (t *http2Server) writeStatus(s *ServerStream, st *status.Status) error { // Write converts the data into HTTP2 data frame and sends it out. Non-nil error // is returns if it fails (e.g., framing error, transport error). func (t *http2Server) write(s *ServerStream, hdr []byte, data mem.BufferSlice, _ *WriteOptions) error { - reader := data.Reader() - if !s.isHeaderSent() { // Headers haven't been written yet. if err := t.writeHeader(s, nil); err != nil { - _ = reader.Close() return err } } else { // Writing headers checks for this condition. if s.getState() == streamDone { - _ = reader.Close() return t.streamContextErr(s) } } @@ -1150,15 +1161,16 @@ func (t *http2Server) write(s *ServerStream, hdr []byte, data mem.BufferSlice, _ df := &dataFrame{ streamID: s.id, h: hdr, - reader: reader, + data: data, onEachWrite: t.setResetPingStrikes, } - if err := s.wq.get(int32(len(hdr) + df.reader.Remaining())); err != nil { - _ = reader.Close() + dataLen := data.Len() + if err := s.wq.get(int32(len(hdr) + dataLen)); err != nil { return t.streamContextErr(s) } + data.Ref() if err := t.controlBuf.put(df); err != nil { - _ = reader.Close() + data.Free() return err } t.incrMsgSent() diff --git a/vendor/google.golang.org/grpc/internal/transport/http_util.go b/vendor/google.golang.org/grpc/internal/transport/http_util.go index 607d2c4ce..e3663f87f 100644 --- a/vendor/google.golang.org/grpc/internal/transport/http_util.go +++ b/vendor/google.golang.org/grpc/internal/transport/http_util.go @@ -200,9 +200,6 @@ func decodeTimeout(s string) (time.Duration, error) { if err != nil { return 0, err } - if t == 0 { - return 0, fmt.Errorf("transport: timeout must be positive: %q", s) - } const maxHours = math.MaxInt64 / uint64(time.Hour) if d == time.Hour && t > maxHours { // This timeout would overflow math.MaxInt64; clamp it. diff --git a/vendor/google.golang.org/grpc/internal/transport/transport.go b/vendor/google.golang.org/grpc/internal/transport/transport.go index 1730a639f..7dd53e80a 100644 --- a/vendor/google.golang.org/grpc/internal/transport/transport.go +++ b/vendor/google.golang.org/grpc/internal/transport/transport.go @@ -466,6 +466,7 @@ type ServerConfig struct { MaxHeaderListSize *uint32 HeaderTableSize *uint32 BufferPool mem.BufferPool + StaticWindowSize bool } // ConnectOptions covers all relevant options for communicating with the server. @@ -504,6 +505,8 @@ type ConnectOptions struct { MaxHeaderListSize *uint32 // The mem.BufferPool to use when reading/writing to the wire. BufferPool mem.BufferPool + // StaticWindowSize controls whether dynamic window sizing is enabled. + StaticWindowSize bool } // WriteOptions provides additional hints and information for message diff --git a/vendor/google.golang.org/grpc/mem/buffer_slice.go b/vendor/google.golang.org/grpc/mem/buffer_slice.go index 65002e2cc..af510d20c 100644 --- a/vendor/google.golang.org/grpc/mem/buffer_slice.go +++ b/vendor/google.golang.org/grpc/mem/buffer_slice.go @@ -137,6 +137,9 @@ type Reader interface { Close() error // Remaining returns the number of unread bytes remaining in the slice. Remaining() int + // Reset frees the currently held buffer slice and starts reading from the + // provided slice. This allows reusing the reader object. + Reset(s BufferSlice) } type sliceReader struct { @@ -150,6 +153,14 @@ func (r *sliceReader) Remaining() int { return r.len } +func (r *sliceReader) Reset(s BufferSlice) { + r.data.Free() + s.Ref() + r.data = s + r.len = s.Len() + r.bufferIdx = 0 +} + func (r *sliceReader) Close() error { r.data.Free() r.data = nil diff --git a/vendor/google.golang.org/grpc/reflection/grpc_reflection_v1/reflection_grpc.pb.go b/vendor/google.golang.org/grpc/reflection/grpc_reflection_v1/reflection_grpc.pb.go index 031082807..f4a361c64 100644 --- a/vendor/google.golang.org/grpc/reflection/grpc_reflection_v1/reflection_grpc.pb.go +++ b/vendor/google.golang.org/grpc/reflection/grpc_reflection_v1/reflection_grpc.pb.go @@ -90,7 +90,7 @@ type ServerReflectionServer interface { type UnimplementedServerReflectionServer struct{} func (UnimplementedServerReflectionServer) ServerReflectionInfo(grpc.BidiStreamingServer[ServerReflectionRequest, ServerReflectionResponse]) error { - return status.Errorf(codes.Unimplemented, "method ServerReflectionInfo not implemented") + return status.Error(codes.Unimplemented, "method ServerReflectionInfo not implemented") } func (UnimplementedServerReflectionServer) testEmbeddedByValue() {} diff --git a/vendor/google.golang.org/grpc/reflection/grpc_reflection_v1alpha/reflection_grpc.pb.go b/vendor/google.golang.org/grpc/reflection/grpc_reflection_v1alpha/reflection_grpc.pb.go index 80755d74d..0a43b521c 100644 --- a/vendor/google.golang.org/grpc/reflection/grpc_reflection_v1alpha/reflection_grpc.pb.go +++ b/vendor/google.golang.org/grpc/reflection/grpc_reflection_v1alpha/reflection_grpc.pb.go @@ -87,7 +87,7 @@ type ServerReflectionServer interface { type UnimplementedServerReflectionServer struct{} func (UnimplementedServerReflectionServer) ServerReflectionInfo(grpc.BidiStreamingServer[ServerReflectionRequest, ServerReflectionResponse]) error { - return status.Errorf(codes.Unimplemented, "method ServerReflectionInfo not implemented") + return status.Error(codes.Unimplemented, "method ServerReflectionInfo not implemented") } func (UnimplementedServerReflectionServer) testEmbeddedByValue() {} diff --git a/vendor/google.golang.org/grpc/server.go b/vendor/google.golang.org/grpc/server.go index 976e70ae0..70fe23f55 100644 --- a/vendor/google.golang.org/grpc/server.go +++ b/vendor/google.golang.org/grpc/server.go @@ -179,6 +179,7 @@ type serverOptions struct { numServerWorkers uint32 bufferPool mem.BufferPool waitForHandlers bool + staticWindowSize bool } var defaultServerOptions = serverOptions{ @@ -279,6 +280,7 @@ func ReadBufferSize(s int) ServerOption { func InitialWindowSize(s int32) ServerOption { return newFuncServerOption(func(o *serverOptions) { o.initialWindowSize = s + o.staticWindowSize = true }) } @@ -287,6 +289,29 @@ func InitialWindowSize(s int32) ServerOption { func InitialConnWindowSize(s int32) ServerOption { return newFuncServerOption(func(o *serverOptions) { o.initialConnWindowSize = s + o.staticWindowSize = true + }) +} + +// StaticStreamWindowSize returns a ServerOption to set the initial stream +// window size to the value provided and disables dynamic flow control. +// The lower bound for window size is 64K and any value smaller than that +// will be ignored. +func StaticStreamWindowSize(s int32) ServerOption { + return newFuncServerOption(func(o *serverOptions) { + o.initialWindowSize = s + o.staticWindowSize = true + }) +} + +// StaticConnWindowSize returns a ServerOption to set the initial connection +// window size to the value provided and disables dynamic flow control. +// The lower bound for window size is 64K and any value smaller than that +// will be ignored. +func StaticConnWindowSize(s int32) ServerOption { + return newFuncServerOption(func(o *serverOptions) { + o.initialConnWindowSize = s + o.staticWindowSize = true }) } @@ -986,6 +1011,7 @@ func (s *Server) newHTTP2Transport(c net.Conn) transport.ServerTransport { MaxHeaderListSize: s.opts.maxHeaderListSize, HeaderTableSize: s.opts.headerTableSize, BufferPool: s.opts.bufferPool, + StaticWindowSize: s.opts.staticWindowSize, } st, err := transport.NewServerTransport(c, config) if err != nil { diff --git a/vendor/google.golang.org/grpc/stream.go b/vendor/google.golang.org/grpc/stream.go index d58bb6471..ca6948926 100644 --- a/vendor/google.golang.org/grpc/stream.go +++ b/vendor/google.golang.org/grpc/stream.go @@ -1171,7 +1171,7 @@ func (a *csAttempt) recvMsg(m any, payInfo *payloadInfo) (err error) { } else if err != nil { return toRPCErr(err) } - return toRPCErr(errors.New("grpc: client streaming protocol violation: get , want ")) + return status.Errorf(codes.Internal, "cardinality violation: expected for non server-streaming RPCs, but received another message") } func (a *csAttempt) finish(err error) { @@ -1495,7 +1495,7 @@ func (as *addrConnStream) RecvMsg(m any) (err error) { } else if err != nil { return toRPCErr(err) } - return toRPCErr(errors.New("grpc: client streaming protocol violation: get , want ")) + return status.Errorf(codes.Internal, "cardinality violation: expected for non server-streaming RPCs, but received another message") } func (as *addrConnStream) finish(err error) { diff --git a/vendor/google.golang.org/grpc/version.go b/vendor/google.golang.org/grpc/version.go index bd82673dc..da68b32e1 100644 --- a/vendor/google.golang.org/grpc/version.go +++ b/vendor/google.golang.org/grpc/version.go @@ -19,4 +19,4 @@ package grpc // Version is the current grpc version. -const Version = "1.73.0" +const Version = "1.74.0-dev" diff --git a/vendor/modules.txt b/vendor/modules.txt index 411b30dfa..ef2314b01 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -672,7 +672,7 @@ github.com/gogo/protobuf/protoc-gen-gogo/descriptor # github.com/golang-jwt/jwt/v4 v4.5.2 ## explicit; go 1.16 github.com/golang-jwt/jwt/v4 -# github.com/golang-jwt/jwt/v5 v5.2.2 +# github.com/golang-jwt/jwt/v5 v5.2.3 ## explicit; go 1.18 github.com/golang-jwt/jwt/v5 # github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da @@ -1213,7 +1213,7 @@ github.com/open-policy-agent/opa/v1/version # github.com/opencloud-eu/libre-graph-api-go v1.0.8-0.20250707143759-32eaae12b2ce ## explicit; go 1.18 github.com/opencloud-eu/libre-graph-api-go -# github.com/opencloud-eu/reva/v2 v2.34.1-0.20250716074813-cfe225225b23 +# github.com/opencloud-eu/reva/v2 v2.34.1-0.20250717130558-81c1803fb574 ## explicit; go 1.24.1 github.com/opencloud-eu/reva/v2/cmd/revad/internal/grace github.com/opencloud-eu/reva/v2/cmd/revad/runtime @@ -2333,7 +2333,7 @@ google.golang.org/genproto/googleapis/api/httpbody ## explicit; go 1.23.0 google.golang.org/genproto/googleapis/rpc/errdetails google.golang.org/genproto/googleapis/rpc/status -# google.golang.org/grpc v1.73.0 +# google.golang.org/grpc v1.74.0 ## explicit; go 1.23.0 google.golang.org/grpc google.golang.org/grpc/attributes From dbd6a4db716bad1bac181b63c07f47723ffb2dbf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Duffeck?= Date: Thu, 17 Jul 2025 15:58:27 +0200 Subject: [PATCH 6/8] Fix tests --- services/search/pkg/search/service.go | 7 +++++++ services/search/pkg/search/service_test.go | 4 ++-- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/services/search/pkg/search/service.go b/services/search/pkg/search/service.go index 0ef9eb37f..e4a2769b3 100644 --- a/services/search/pkg/search/service.go +++ b/services/search/pkg/search/service.go @@ -92,6 +92,10 @@ func (s *Service) Search(ctx context.Context, req *searchsvc.SearchRequest) (*se startTime := time.Now() success := false defer func() { + if s.metrics == nil { + return + } + status := "success" if !success { status = "error" @@ -444,6 +448,9 @@ func (s *Service) IndexSpace(spaceID *provider.StorageSpaceId) error { startTime := time.Now() success := false defer func() { + if s.metrics == nil { + return + } status := "success" if !success { status = "error" diff --git a/services/search/pkg/search/service_test.go b/services/search/pkg/search/service_test.go index f7e9d01e4..c6f8eb0ef 100644 --- a/services/search/pkg/search/service_test.go +++ b/services/search/pkg/search/service_test.go @@ -90,7 +90,7 @@ var _ = Describe("Searchprovider", func() { indexClient = &engineMocks.Engine{} extractor = &contentMocks.Extractor{} - s = search.NewService(gatewaySelector, indexClient, extractor, logger, &config.Config{}) + s = search.NewService(gatewaySelector, indexClient, extractor, nil, logger, &config.Config{}) gatewayClient.On("Authenticate", mock.Anything, mock.Anything).Return(&gateway.AuthenticateResponse{ Status: status.NewOK(ctx), @@ -110,7 +110,7 @@ var _ = Describe("Searchprovider", func() { Describe("New", func() { It("returns a new instance", func() { - s := search.NewService(gatewaySelector, indexClient, extractor, logger, &config.Config{}) + s := search.NewService(gatewaySelector, indexClient, extractor, nil, logger, &config.Config{}) Expect(s).ToNot(BeNil()) }) }) From 386e322050d80498bc884f635982db7a0a375fd8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Duffeck?= Date: Fri, 18 Jul 2025 08:27:06 +0200 Subject: [PATCH 7/8] Fix metric --- services/postprocessing/pkg/service/service.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/postprocessing/pkg/service/service.go b/services/postprocessing/pkg/service/service.go index 768bece9a..886d0599a 100644 --- a/services/postprocessing/pkg/service/service.go +++ b/services/postprocessing/pkg/service/service.go @@ -219,7 +219,7 @@ func (pps *PostprocessingService) processEvent(e raw.Event) error { } if ev.Failed { - pps.metrics.Finished.WithLabelValues("failed", string(pp.Status.Outcome)).Inc() + pps.metrics.Finished.WithLabelValues("failed").Inc() if !pp.StartTime.IsZero() { pps.metrics.Duration.WithLabelValues("failed").Observe(time.Since(pp.StartTime).Seconds()) } From 3bd7a61e463659f23adbf0567356de45786d9cfc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Duffeck?= Date: Fri, 18 Jul 2025 08:50:03 +0200 Subject: [PATCH 8/8] Document the metrics exposed in search and postprocessing --- services/postprocessing/README.md | 14 ++++++++++++++ services/search/README.md | 13 +++++++++++++ 2 files changed, 27 insertions(+) diff --git a/services/postprocessing/README.md b/services/postprocessing/README.md index cd8d87755..fba133f5d 100644 --- a/services/postprocessing/README.md +++ b/services/postprocessing/README.md @@ -119,3 +119,17 @@ Depending if you want to restart/resume all or defined failed uploads, different opencloud postprocessing resume -s "finished" # Equivalent to the above opencloud postprocessing resume -s "virusscan" # Resume all uploads currently in virusscan step ``` + +## Metrics + +The postprocessing service exposes the following prometheus metrics at `/metrics` (as configured using the `POSTPROCESSING_DEBUG_ADDR` env var): + +| Metric Name | Type | Description | Labels | +| --- | --- | --- | --- | +| `opencloud_postprocessing_build_info` | Gauge | Build information | `version` | +| `opencloud_postprocessing_events_outstanding_acks` | Gauge | Number of outstanding acks for events | | +| `opencloud_postprocessing_events_unprocessed` | Gauge | Number of unprocessed events | | +| `opencloud_postprocessing_events_redelivered` | Gauge | Number of redelivered events | | +| `opencloud_postprocessing_in_progress` | Gauge | Number of postprocessing events in progress | | +| `opencloud_postprocessing_finished` | Counter | Number of finished postprocessing events | `status` | +| `opencloud_postprocessing_duration_seconds` | Histogram | Duration of postprocessing operations in seconds | `status` | diff --git a/services/search/README.md b/services/search/README.md index 01c15de02..24b492b11 100644 --- a/services/search/README.md +++ b/services/search/README.md @@ -162,3 +162,16 @@ The indexing process tries to be self-healing in some situations. In the following example, let's assume a file tree `foo/bar/baz` exists. If the folder `bar` gets renamed to `new-bar`, the path to `baz` is no longer `foo/bar/baz` but `foo/new-bar/baz`. The search service checks the change and either just updates the path in the index or creates a new index for all items affected if none was present. + +## Metrics + +The search service exposes the following prometheus metrics at `/metrics` (as configured using the `SEARCH_DEBUG_ADDR` env var): + +| Metric Name | Type | Description | Labels | +| --- | --- | --- | --- | +| `opencloud_search_build_info` | Gauge | Build information | `version` | +| `opencloud_search_events_outstanding_acks` | Gauge | Number of outstanding acks for events | | +| `opencloud_search_events_unprocessed` | Gauge | Number of unprocessed events | | +| `opencloud_search_events_redelivered` | Gauge | Number of redelivered events | | +| `opencloud_search_search_duration_seconds` | Histogram | Duration of search operations in seconds | `status` | +| `opencloud_search_index_duration_seconds` | Histogram | Duration of indexing operations in seconds | `status` |