Merge pull request #1242 from aduffeck/metrics

Metrics
This commit is contained in:
Andre Duffeck
2025-07-18 14:06:29 +02:00
committed by GitHub
55 changed files with 645 additions and 230 deletions

6
go.mod
View File

@@ -35,7 +35,7 @@ require (
github.com/go-micro/plugins/v4/wrapper/trace/opentelemetry v1.2.0
github.com/go-playground/validator/v10 v10.27.0
github.com/gofrs/uuid v4.4.0+incompatible
github.com/golang-jwt/jwt/v5 v5.2.2
github.com/golang-jwt/jwt/v5 v5.2.3
github.com/golang/protobuf v1.5.4
github.com/google/go-cmp v0.7.0
github.com/google/go-tika v0.3.1
@@ -64,7 +64,7 @@ require (
github.com/onsi/gomega v1.37.0
github.com/open-policy-agent/opa v1.6.0
github.com/opencloud-eu/libre-graph-api-go v1.0.8-0.20250707143759-32eaae12b2ce
github.com/opencloud-eu/reva/v2 v2.34.1-0.20250716074813-cfe225225b23
github.com/opencloud-eu/reva/v2 v2.34.1-0.20250717130558-81c1803fb574
github.com/orcaman/concurrent-map v1.0.0
github.com/pkg/errors v0.9.1
github.com/pkg/xattr v0.4.12
@@ -106,7 +106,7 @@ require (
golang.org/x/term v0.33.0
golang.org/x/text v0.27.0
google.golang.org/genproto/googleapis/api v0.0.0-20250603155806-513f23925822
google.golang.org/grpc v1.73.0
google.golang.org/grpc v1.74.0
google.golang.org/protobuf v1.36.6
gopkg.in/yaml.v2 v2.4.0
gotest.tools/v3 v3.5.2

12
go.sum
View File

@@ -459,8 +459,8 @@ github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69
github.com/goji/httpauth v0.0.0-20160601135302-2da839ab0f4d/go.mod h1:nnjvkQ9ptGaCkuDUx6wNykzzlUixGxvkme+H/lnzb+A=
github.com/golang-jwt/jwt/v4 v4.5.2 h1:YtQM7lnr8iZ+j5q71MGKkNw9Mn7AjHM68uc9g5fXeUI=
github.com/golang-jwt/jwt/v4 v4.5.2/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0=
github.com/golang-jwt/jwt/v5 v5.2.2 h1:Rl4B7itRWVtYIHFrSNd7vhTiz9UpLdi6gZhZ3wEeDy8=
github.com/golang-jwt/jwt/v5 v5.2.2/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk=
github.com/golang-jwt/jwt/v5 v5.2.3 h1:kkGXqQOBSDDWRhWNXTFpqGSCMyh/PLnqUvMGJPDJDs0=
github.com/golang-jwt/jwt/v5 v5.2.3/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
@@ -868,8 +868,8 @@ github.com/opencloud-eu/go-micro-plugins/v4/store/nats-js-kv v0.0.0-202505121527
github.com/opencloud-eu/go-micro-plugins/v4/store/nats-js-kv v0.0.0-20250512152754-23325793059a/go.mod h1:pjcozWijkNPbEtX5SIQaxEW/h8VAVZYTLx+70bmB3LY=
github.com/opencloud-eu/libre-graph-api-go v1.0.8-0.20250707143759-32eaae12b2ce h1:tjbIYsW5CFsEbCf5B/KN0Mo1oKU/K+oipgFm2B6wzG4=
github.com/opencloud-eu/libre-graph-api-go v1.0.8-0.20250707143759-32eaae12b2ce/go.mod h1:pzatilMEHZFT3qV7C/X3MqOa3NlRQuYhlRhZTL+hN6Q=
github.com/opencloud-eu/reva/v2 v2.34.1-0.20250716074813-cfe225225b23 h1:FY6l12zi57efPXe9kVU1U6FB6HMuAV/t0XJPEU2XVDw=
github.com/opencloud-eu/reva/v2 v2.34.1-0.20250716074813-cfe225225b23/go.mod h1:5Zur6s3GoCbhdU09voU8EO+Ls71NiHgWYmhcvmngjwY=
github.com/opencloud-eu/reva/v2 v2.34.1-0.20250717130558-81c1803fb574 h1:sx7eqVOdc1i+fGk6f3yoAdtPE1vYzAtBchT9kVjAnTk=
github.com/opencloud-eu/reva/v2 v2.34.1-0.20250717130558-81c1803fb574/go.mod h1:UVPwuMjfgPekuh7unWavJSiPihgmk1GYF3xct0q3+X0=
github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
github.com/opentracing/opentracing-go v1.2.0 h1:uEJPy/1a5RIPAJ0Ov+OIO8OxWu77jEv+1B0VhjKrZUs=
github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYrxe9dPLANfrWvHYVTgc=
@@ -1624,8 +1624,8 @@ google.golang.org/grpc v1.29.1/go.mod h1:itym6AZVZYACWQqET3MqgPpjcuV5QH3BxFS3Iji
google.golang.org/grpc v1.30.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak=
google.golang.org/grpc v1.31.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak=
google.golang.org/grpc v1.33.2/go.mod h1:JMHMWHQWaTccqQQlmk3MJZS+GWXOdAesneDmEnv2fbc=
google.golang.org/grpc v1.73.0 h1:VIWSmpI2MegBtTuFt5/JWy2oXxtjJ/e89Z70ImfD2ok=
google.golang.org/grpc v1.73.0/go.mod h1:50sbHOUqWoCQGI8V2HQLJM0B+LMlIUjNSZmow7EVBQc=
google.golang.org/grpc v1.74.0 h1:sxRSkyLxlceWQiqDofxDot3d4u7DyoHPc7SBXMj8gGY=
google.golang.org/grpc v1.74.0/go.mod h1:NZUaK8dAMUfzhK6uxZ+9511LtOrk73UGWOFoNvz7z+s=
google.golang.org/grpc/examples v0.0.0-20211102180624-670c133e568e h1:m7aQHHqd0q89mRwhwS9Bx2rjyl/hsFAeta+uGrHsQaU=
google.golang.org/grpc/examples v0.0.0-20211102180624-670c133e568e/go.mod h1:gID3PKrg7pWKntu9Ss6zTLJ0ttC0X9IHgREOCZwbCVU=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=

View File

@@ -119,3 +119,17 @@ Depending if you want to restart/resume all or defined failed uploads, different
opencloud postprocessing resume -s "finished" # Equivalent to the above
opencloud postprocessing resume -s "virusscan" # Resume all uploads currently in virusscan step
```
## Metrics
The postprocessing service exposes the following prometheus metrics at `<debug_endpoint>/metrics` (as configured using the `POSTPROCESSING_DEBUG_ADDR` env var):
| Metric Name | Type | Description | Labels |
| --- | --- | --- | --- |
| `opencloud_postprocessing_build_info` | Gauge | Build information | `version` |
| `opencloud_postprocessing_events_outstanding_acks` | Gauge | Number of outstanding acks for events | |
| `opencloud_postprocessing_events_unprocessed` | Gauge | Number of unprocessed events | |
| `opencloud_postprocessing_events_redelivered` | Gauge | Number of redelivered events | |
| `opencloud_postprocessing_in_progress` | Gauge | Number of postprocessing events in progress | |
| `opencloud_postprocessing_finished` | Counter | Number of finished postprocessing events | `status` |
| `opencloud_postprocessing_duration_seconds` | Histogram | Duration of postprocessing operations in seconds | `status` |

View File

@@ -0,0 +1,77 @@
package metrics
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
var (
// Namespace defines the namespace for the defines metrics.
Namespace = "opencloud"
// Subsystem defines the subsystem for the defines metrics.
Subsystem = "postprocessing"
)
// Metrics defines the available metrics of this service.
type Metrics struct {
// Counter *prometheus.CounterVec
BuildInfo *prometheus.GaugeVec
EventsOutstandingAcks prometheus.Gauge
EventsUnprocessed prometheus.Gauge
EventsRedelivered prometheus.Gauge
InProgress prometheus.Gauge
Finished *prometheus.CounterVec
Duration *prometheus.HistogramVec
}
// New initializes the available metrics.
func New() *Metrics {
m := &Metrics{
BuildInfo: promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: Namespace,
Subsystem: Subsystem,
Name: "build_info",
Help: "Build information",
}, []string{"version"}),
EventsOutstandingAcks: promauto.NewGauge(prometheus.GaugeOpts{
Namespace: Namespace,
Subsystem: Subsystem,
Name: "events_outstanding_acks",
Help: "Number of outstanding acks for events",
}),
EventsUnprocessed: promauto.NewGauge(prometheus.GaugeOpts{
Namespace: Namespace,
Subsystem: Subsystem,
Name: "events_unprocessed",
Help: "Number of unprocessed events",
}),
EventsRedelivered: promauto.NewGauge(prometheus.GaugeOpts{
Namespace: Namespace,
Subsystem: Subsystem,
Name: "events_redelivered",
Help: "Number of redelivered events",
}),
InProgress: promauto.NewGauge(prometheus.GaugeOpts{
Namespace: Namespace,
Subsystem: Subsystem,
Name: "in_progress",
Help: "Number of postprocessing events in progress",
}),
Finished: promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: Namespace,
Subsystem: Subsystem,
Name: "finished",
Help: "Number of finished postprocessing events",
}, []string{"status"}),
Duration: promauto.NewHistogramVec(prometheus.HistogramOpts{
Namespace: Namespace,
Subsystem: Subsystem,
Name: "duration_seconds",
Help: "Duration of postprocessing operations in seconds",
Buckets: []float64{0.1, 0.5, 1, 2.5, 5, 10, 30, 60, 120, 300, 600, 1200},
}, []string{"status"}),
}
return m
}

View File

@@ -24,6 +24,7 @@ type Postprocessing struct {
Failures int
InitiatorID string
Finished bool
StartTime time.Time
config config.Postprocessing
}

View File

@@ -9,7 +9,9 @@ import (
"time"
"github.com/opencloud-eu/opencloud/pkg/log"
"github.com/opencloud-eu/opencloud/pkg/version"
"github.com/opencloud-eu/opencloud/services/postprocessing/pkg/config"
"github.com/opencloud-eu/opencloud/services/postprocessing/pkg/metrics"
"github.com/opencloud-eu/opencloud/services/postprocessing/pkg/postprocessing"
ctxpkg "github.com/opencloud-eu/reva/v2/pkg/ctx"
"github.com/opencloud-eu/reva/v2/pkg/events"
@@ -22,14 +24,15 @@ import (
// PostprocessingService is an instance of the service handling postprocessing of files
type PostprocessingService struct {
ctx context.Context
log log.Logger
events <-chan raw.Event
pub events.Publisher
steps []events.Postprocessingstep
store store.Store
c config.Postprocessing
tp trace.TracerProvider
ctx context.Context
log log.Logger
events <-chan raw.Event
pub events.Publisher
steps []events.Postprocessingstep
store store.Store
c config.Postprocessing
tp trace.TracerProvider
metrics *metrics.Metrics
}
var (
@@ -78,15 +81,20 @@ func NewPostprocessingService(ctx context.Context, logger log.Logger, sto store.
return nil, err
}
m := metrics.New()
m.BuildInfo.WithLabelValues(version.GetString()).Set(1)
monitorMetrics(raw, "postprocessing-pull", m, logger)
return &PostprocessingService{
ctx: ctx,
log: logger,
events: evs,
pub: pub,
steps: getSteps(cfg.Postprocessing),
store: sto,
c: cfg.Postprocessing,
tp: tp,
ctx: ctx,
log: logger,
events: evs,
pub: pub,
steps: getSteps(cfg.Postprocessing),
store: sto,
c: cfg.Postprocessing,
tp: tp,
metrics: m,
}, nil
}
@@ -149,7 +157,9 @@ func (pps *PostprocessingService) processEvent(e raw.Event) error {
Steps: pps.steps,
InitiatorID: e.InitiatorID,
ImpersonatingUser: ev.ImpersonatingUser,
StartTime: time.Now(),
}
pps.metrics.InProgress.Inc()
next = pp.Init(ev)
case events.PostprocessingStepFinished:
if ev.UploadID == "" {
@@ -200,17 +210,27 @@ func (pps *PostprocessingService) processEvent(e raw.Event) error {
}
})
case events.UploadReady:
pps.metrics.InProgress.Dec()
// the upload failed - let's keep it around for a while - but mark it as finished
pp, err = pps.getPP(pps.store, ev.UploadID)
if err != nil {
pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot get upload")
return fmt.Errorf("%w: cannot get upload", ErrEvent)
}
if ev.Failed {
// the upload failed - let's keep it around for a while - but mark it as finished
pp, err = pps.getPP(pps.store, ev.UploadID)
if err != nil {
pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot get upload")
return fmt.Errorf("%w: cannot get upload", ErrEvent)
pps.metrics.Finished.WithLabelValues("failed").Inc()
if !pp.StartTime.IsZero() {
pps.metrics.Duration.WithLabelValues("failed").Observe(time.Since(pp.StartTime).Seconds())
}
pp.Finished = true
return storePP(pps.store, pp)
}
pps.metrics.Finished.WithLabelValues("succeeded").Inc()
if !pp.StartTime.IsZero() {
pps.metrics.Duration.WithLabelValues("succeeded").Observe(time.Since(pp.StartTime).Seconds())
}
// the storage provider thinks the upload is done - so no need to keep it any more
if err := pps.store.Delete(ev.UploadID); err != nil {
pps.log.Error().Str("uploadID", ev.UploadID).Err(err).Msg("cannot delete upload")
@@ -360,3 +380,25 @@ func (pps *PostprocessingService) findUploadsByStep(step events.Postprocessingst
return ids
}
func monitorMetrics(stream raw.Stream, name string, m *metrics.Metrics, logger log.Logger) {
ctx := context.Background()
consumer, err := stream.JetStream().Consumer(ctx, name)
if err != nil {
logger.Error().Err(err).Msg("failed to get consumer")
}
ticker := time.NewTicker(5 * time.Second)
go func() {
for range ticker.C {
info, err := consumer.Info(ctx)
if err != nil {
logger.Error().Err(err).Msg("failed to get consumer")
}
m.EventsOutstandingAcks.Set(float64(info.NumAckPending))
m.EventsUnprocessed.Set(float64(info.NumPending))
m.EventsRedelivered.Set(float64(info.NumRedelivered))
logger.Trace().Msg("updated postprocessing event metrics")
}
}()
}

View File

@@ -15,4 +15,4 @@ packages:
Retriever: {}
github.com/opencloud-eu/opencloud/services/search/pkg/search:
interfaces:
Searcher: {}
Searcher: {}

View File

@@ -162,3 +162,16 @@ The indexing process tries to be self-healing in some situations.
In the following example, let's assume a file tree `foo/bar/baz` exists.
If the folder `bar` gets renamed to `new-bar`, the path to `baz` is no longer `foo/bar/baz` but `foo/new-bar/baz`.
The search service checks the change and either just updates the path in the index or creates a new index for all items affected if none was present.
## Metrics
The search service exposes the following prometheus metrics at `<debug_endpoint>/metrics` (as configured using the `SEARCH_DEBUG_ADDR` env var):
| Metric Name | Type | Description | Labels |
| --- | --- | --- | --- |
| `opencloud_search_build_info` | Gauge | Build information | `version` |
| `opencloud_search_events_outstanding_acks` | Gauge | Number of outstanding acks for events | |
| `opencloud_search_events_unprocessed` | Gauge | Number of unprocessed events | |
| `opencloud_search_events_redelivered` | Gauge | Number of redelivered events | |
| `opencloud_search_search_duration_seconds` | Histogram | Duration of search operations in seconds | `status` |
| `opencloud_search_index_duration_seconds` | Histogram | Duration of indexing operations in seconds | `status` |

View File

@@ -1,6 +1,9 @@
package metrics
import "github.com/prometheus/client_golang/prometheus"
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
var (
// Namespace defines the namespace for the defines metrics.
@@ -13,21 +16,56 @@ var (
// Metrics defines the available metrics of this service.
type Metrics struct {
// Counter *prometheus.CounterVec
BuildInfo *prometheus.GaugeVec
BuildInfo *prometheus.GaugeVec
EventsOutstandingAcks prometheus.Gauge
EventsUnprocessed prometheus.Gauge
EventsRedelivered prometheus.Gauge
SearchDuration *prometheus.HistogramVec
IndexDuration *prometheus.HistogramVec
}
// New initializes the available metrics.
func New() *Metrics {
m := &Metrics{
BuildInfo: prometheus.NewGaugeVec(prometheus.GaugeOpts{
BuildInfo: promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: Namespace,
Subsystem: Subsystem,
Name: "build_info",
Help: "Build information",
}, []string{"version"}),
EventsOutstandingAcks: promauto.NewGauge(prometheus.GaugeOpts{
Namespace: Namespace,
Subsystem: Subsystem,
Name: "events_outstanding_acks",
Help: "Number of outstanding acks for events",
}),
EventsUnprocessed: promauto.NewGauge(prometheus.GaugeOpts{
Namespace: Namespace,
Subsystem: Subsystem,
Name: "events_unprocessed",
Help: "Number of unprocessed events",
}),
EventsRedelivered: promauto.NewGauge(prometheus.GaugeOpts{
Namespace: Namespace,
Subsystem: Subsystem,
Name: "events_redelivered",
Help: "Number of redelivered events",
}),
SearchDuration: promauto.NewHistogramVec(prometheus.HistogramOpts{
Namespace: Namespace,
Subsystem: Subsystem,
Name: "search_duration_seconds",
Help: "Duration of search operations in seconds",
Buckets: []float64{0.1, 0.5, 1, 2.5, 5, 10, 30, 60},
}, []string{"status"}),
IndexDuration: promauto.NewHistogramVec(prometheus.HistogramOpts{
Namespace: Namespace,
Subsystem: Subsystem,
Name: "index_duration_seconds",
Help: "Duration of indexing operations in seconds",
Buckets: []float64{0.1, 0.5, 1, 2.5, 5, 10, 30, 60, 120, 300, 600, 1200},
}, []string{"status"}),
}
_ = prometheus.Register(m.BuildInfo)
// TODO: implement metrics
return m
}

View File

@@ -1,11 +1,13 @@
package search
import (
"context"
"time"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"github.com/opencloud-eu/opencloud/pkg/log"
"github.com/opencloud-eu/opencloud/services/search/pkg/config"
"github.com/opencloud-eu/opencloud/services/search/pkg/metrics"
"github.com/opencloud-eu/reva/v2/pkg/events"
"github.com/opencloud-eu/reva/v2/pkg/events/raw"
"github.com/opencloud-eu/reva/v2/pkg/storagespace"
@@ -13,7 +15,7 @@ import (
// HandleEvents listens to the needed events,
// it handles the whole resource indexing livecycle.
func HandleEvents(s Searcher, stream raw.Stream, cfg *config.Config, logger log.Logger) error {
func HandleEvents(s Searcher, stream raw.Stream, cfg *config.Config, m *metrics.Metrics, logger log.Logger) error {
evts := []events.Unmarshaller{
events.ItemTrashed{},
events.ItemRestored{},
@@ -37,6 +39,10 @@ func HandleEvents(s Searcher, stream raw.Stream, cfg *config.Config, logger log.
return err
}
if m != nil {
monitorMetrics(stream, "search-pull", m, logger)
}
if cfg.Events.NumConsumers == 0 {
cfg.Events.NumConsumers = 1
}
@@ -103,3 +109,25 @@ func HandleEvents(s Searcher, stream raw.Stream, cfg *config.Config, logger log.
return nil
}
func monitorMetrics(stream raw.Stream, name string, m *metrics.Metrics, logger log.Logger) {
ctx := context.Background()
consumer, err := stream.JetStream().Consumer(ctx, name)
if err != nil {
logger.Error().Err(err).Msg("failed to get consumer")
}
ticker := time.NewTicker(5 * time.Second)
go func() {
for range ticker.C {
info, err := consumer.Info(ctx)
if err != nil {
logger.Error().Err(err).Msg("failed to get consumer")
}
m.EventsOutstandingAcks.Set(float64(info.NumAckPending))
m.EventsUnprocessed.Set(float64(info.NumPending))
m.EventsRedelivered.Set(float64(info.NumRedelivered))
logger.Trace().Msg("updated search event metrics")
}
}()
}

View File

@@ -31,7 +31,7 @@ var _ = DescribeTable("events",
Events: config.Events{
AsyncUploads: asyncUploads,
},
}, log.NewLogger())
}, nil, log.NewLogger())
for _, mck := range mcks {
s.On(mck, mock.Anything, mock.Anything).Return(nil).Run(func(args mock.Arguments) {

View File

@@ -14,6 +14,7 @@ import (
rpcv1beta1 "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
collaborationv1beta1 "github.com/cs3org/go-cs3apis/cs3/sharing/collaboration/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
libregraph "github.com/opencloud-eu/libre-graph-api-go"
revactx "github.com/opencloud-eu/reva/v2/pkg/ctx"
"github.com/opencloud-eu/reva/v2/pkg/errtypes"
"github.com/opencloud-eu/reva/v2/pkg/rgrpc/todo/pool"
@@ -21,7 +22,6 @@ import (
"github.com/opencloud-eu/reva/v2/pkg/storage/utils/walker"
"github.com/opencloud-eu/reva/v2/pkg/storagespace"
"github.com/opencloud-eu/reva/v2/pkg/utils"
libregraph "github.com/opencloud-eu/libre-graph-api-go"
"golang.org/x/sync/errgroup"
"google.golang.org/protobuf/types/known/fieldmaskpb"
@@ -31,6 +31,7 @@ import (
"github.com/opencloud-eu/opencloud/services/search/pkg/config"
"github.com/opencloud-eu/opencloud/services/search/pkg/content"
"github.com/opencloud-eu/opencloud/services/search/pkg/engine"
"github.com/opencloud-eu/opencloud/services/search/pkg/metrics"
)
const (
@@ -59,6 +60,7 @@ type Service struct {
gatewaySelector pool.Selectable[gateway.GatewayAPIClient]
engine engine.Engine
extractor content.Extractor
metrics *metrics.Metrics
serviceAccountID string
serviceAccountSecret string
@@ -67,12 +69,13 @@ type Service struct {
var errSkipSpace error
// NewService creates a new Provider instance.
func NewService(gatewaySelector pool.Selectable[gateway.GatewayAPIClient], eng engine.Engine, extractor content.Extractor, logger log.Logger, cfg *config.Config) *Service {
func NewService(gatewaySelector pool.Selectable[gateway.GatewayAPIClient], eng engine.Engine, extractor content.Extractor, metrics *metrics.Metrics, logger log.Logger, cfg *config.Config) *Service {
var s = &Service{
gatewaySelector: gatewaySelector,
engine: eng,
logger: logger,
extractor: extractor,
metrics: metrics,
serviceAccountID: cfg.ServiceAccount.ServiceAccountID,
serviceAccountSecret: cfg.ServiceAccount.ServiceAccountSecret,
@@ -85,6 +88,21 @@ func NewService(gatewaySelector pool.Selectable[gateway.GatewayAPIClient], eng e
func (s *Service) Search(ctx context.Context, req *searchsvc.SearchRequest) (*searchsvc.SearchResponse, error) {
s.logger.Debug().Str("query", req.Query).Msg("performing a search")
// collect metrics
startTime := time.Now()
success := false
defer func() {
if s.metrics == nil {
return
}
status := "success"
if !success {
status = "error"
}
s.metrics.SearchDuration.WithLabelValues(status).Observe(time.Since(startTime).Seconds())
}()
gatewayClient, err := s.gatewaySelector.Next()
if err != nil {
return nil, err
@@ -255,6 +273,7 @@ func (s *Service) Search(ctx context.Context, req *searchsvc.SearchRequest) (*se
matches = matches[0:limit]
}
success = true
return &searchsvc.SearchResponse{
Matches: matches,
TotalMatches: total,
@@ -425,6 +444,20 @@ func (s *Service) IndexSpace(spaceID *provider.StorageSpaceId) error {
}
rootID.OpaqueId = rootID.SpaceId
// Collect metrics
startTime := time.Now()
success := false
defer func() {
if s.metrics == nil {
return
}
status := "success"
if !success {
status = "error"
}
s.metrics.IndexDuration.WithLabelValues(status).Observe(time.Since(startTime).Seconds())
}()
w := walker.NewWalker(s.gatewaySelector)
err = w.Walk(ownerCtx, &rootID, func(wd string, info *provider.ResourceInfo, err error) error {
if err != nil {
@@ -465,6 +498,7 @@ func (s *Service) IndexSpace(spaceID *provider.StorageSpaceId) error {
}
logDocCount(s.engine, s.logger)
success = true
return nil
}

View File

@@ -90,7 +90,7 @@ var _ = Describe("Searchprovider", func() {
indexClient = &engineMocks.Engine{}
extractor = &contentMocks.Extractor{}
s = search.NewService(gatewaySelector, indexClient, extractor, logger, &config.Config{})
s = search.NewService(gatewaySelector, indexClient, extractor, nil, logger, &config.Config{})
gatewayClient.On("Authenticate", mock.Anything, mock.Anything).Return(&gateway.AuthenticateResponse{
Status: status.NewOK(ctx),
@@ -110,7 +110,7 @@ var _ = Describe("Searchprovider", func() {
Describe("New", func() {
It("returns a new instance", func() {
s := search.NewService(gatewaySelector, indexClient, extractor, logger, &config.Config{})
s := search.NewService(gatewaySelector, indexClient, extractor, nil, logger, &config.Config{})
Expect(s).ToNot(BeNil())
})
})

View File

@@ -36,6 +36,7 @@ func Server(opts ...Option) (grpc.Service, func(), error) {
svc.Logger(options.Logger),
svc.JWTSecret(options.JWTSecret),
svc.TracerProvider(options.TraceProvider),
svc.Metrics(options.Metrics),
)
if err != nil {
options.Logger.Error().

View File

@@ -3,6 +3,7 @@ package service
import (
"github.com/opencloud-eu/opencloud/pkg/log"
"github.com/opencloud-eu/opencloud/services/search/pkg/config"
"github.com/opencloud-eu/opencloud/services/search/pkg/metrics"
"go.opentelemetry.io/otel/trace"
)
@@ -15,6 +16,7 @@ type Options struct {
Config *config.Config
JWTSecret string
TracerProvider trace.TracerProvider
Metrics *metrics.Metrics
}
func newOptions(opts ...Option) Options {
@@ -54,3 +56,12 @@ func TracerProvider(val trace.TracerProvider) Option {
o.TracerProvider = val
}
}
// Metrics provides a function to set the Metrics option.
func Metrics(val *metrics.Metrics) Option {
return func(o *Options) {
if val != nil {
o.Metrics = val
}
}
}

View File

@@ -79,7 +79,7 @@ func NewHandler(opts ...Option) (searchsvc.SearchProviderHandler, func(), error)
return nil, teardown, fmt.Errorf("unknown search extractor: %s", cfg.Extractor.Type)
}
ss := search.NewService(selector, eng, extractor, logger, cfg)
ss := search.NewService(selector, eng, extractor, options.Metrics, logger, cfg)
// setup event handling
@@ -98,7 +98,7 @@ func NewHandler(opts ...Option) (searchsvc.SearchProviderHandler, func(), error)
return nil, teardown, err
}
if err := search.HandleEvents(ss, stream, cfg, logger); err != nil {
if err := search.HandleEvents(ss, stream, cfg, options.Metrics, logger); err != nil {
return nil, teardown, err
}

View File

@@ -155,7 +155,7 @@ stored in base64 encoded form, which was redundant with the information in the
type Token struct {
Raw string // Raw contains the raw token
Method SigningMethod // Method is the signing method used or to be used
Header map[string]interface{} // Header is the first segment of the token in decoded form
Header map[string]any // Header is the first segment of the token in decoded form
Claims Claims // Claims is the second segment of the token in decoded form
Signature []byte // Signature is the third segment of the token in decoded form
Valid bool // Valid specifies if the token is valid

View File

@@ -55,7 +55,7 @@ func (m *SigningMethodECDSA) Alg() string {
// Verify implements token verification for the SigningMethod.
// For this verify method, key must be an ecdsa.PublicKey struct
func (m *SigningMethodECDSA) Verify(signingString string, sig []byte, key interface{}) error {
func (m *SigningMethodECDSA) Verify(signingString string, sig []byte, key any) error {
// Get the key
var ecdsaKey *ecdsa.PublicKey
switch k := key.(type) {
@@ -89,7 +89,7 @@ func (m *SigningMethodECDSA) Verify(signingString string, sig []byte, key interf
// Sign implements token signing for the SigningMethod.
// For this signing method, key must be an ecdsa.PrivateKey struct
func (m *SigningMethodECDSA) Sign(signingString string, key interface{}) ([]byte, error) {
func (m *SigningMethodECDSA) Sign(signingString string, key any) ([]byte, error) {
// Get the key
var ecdsaKey *ecdsa.PrivateKey
switch k := key.(type) {

View File

@@ -23,7 +23,7 @@ func ParseECPrivateKeyFromPEM(key []byte) (*ecdsa.PrivateKey, error) {
}
// Parse the key
var parsedKey interface{}
var parsedKey any
if parsedKey, err = x509.ParseECPrivateKey(block.Bytes); err != nil {
if parsedKey, err = x509.ParsePKCS8PrivateKey(block.Bytes); err != nil {
return nil, err
@@ -50,7 +50,7 @@ func ParseECPublicKeyFromPEM(key []byte) (*ecdsa.PublicKey, error) {
}
// Parse the key
var parsedKey interface{}
var parsedKey any
if parsedKey, err = x509.ParsePKIXPublicKey(block.Bytes); err != nil {
if cert, err := x509.ParseCertificate(block.Bytes); err == nil {
parsedKey = cert.PublicKey

View File

@@ -33,7 +33,7 @@ func (m *SigningMethodEd25519) Alg() string {
// Verify implements token verification for the SigningMethod.
// For this verify method, key must be an ed25519.PublicKey
func (m *SigningMethodEd25519) Verify(signingString string, sig []byte, key interface{}) error {
func (m *SigningMethodEd25519) Verify(signingString string, sig []byte, key any) error {
var ed25519Key ed25519.PublicKey
var ok bool
@@ -55,7 +55,7 @@ func (m *SigningMethodEd25519) Verify(signingString string, sig []byte, key inte
// Sign implements token signing for the SigningMethod.
// For this signing method, key must be an ed25519.PrivateKey
func (m *SigningMethodEd25519) Sign(signingString string, key interface{}) ([]byte, error) {
func (m *SigningMethodEd25519) Sign(signingString string, key any) ([]byte, error) {
var ed25519Key crypto.Signer
var ok bool

View File

@@ -24,7 +24,7 @@ func ParseEdPrivateKeyFromPEM(key []byte) (crypto.PrivateKey, error) {
}
// Parse the key
var parsedKey interface{}
var parsedKey any
if parsedKey, err = x509.ParsePKCS8PrivateKey(block.Bytes); err != nil {
return nil, err
}
@@ -49,7 +49,7 @@ func ParseEdPublicKeyFromPEM(key []byte) (crypto.PublicKey, error) {
}
// Parse the key
var parsedKey interface{}
var parsedKey any
if parsedKey, err = x509.ParsePKIXPublicKey(block.Bytes); err != nil {
return nil, err
}

View File

@@ -55,7 +55,7 @@ func (m *SigningMethodHMAC) Alg() string {
// about this, and why we intentionally are not supporting string as a key can
// be found on our usage guide
// https://golang-jwt.github.io/jwt/usage/signing_methods/#signing-methods-and-key-types.
func (m *SigningMethodHMAC) Verify(signingString string, sig []byte, key interface{}) error {
func (m *SigningMethodHMAC) Verify(signingString string, sig []byte, key any) error {
// Verify the key is the right type
keyBytes, ok := key.([]byte)
if !ok {
@@ -88,7 +88,7 @@ func (m *SigningMethodHMAC) Verify(signingString string, sig []byte, key interfa
// cryptographically random source, e.g. crypto/rand. Additional information
// about this, and why we intentionally are not supporting string as a key can
// be found on our usage guide https://golang-jwt.github.io/jwt/usage/signing_methods/.
func (m *SigningMethodHMAC) Sign(signingString string, key interface{}) ([]byte, error) {
func (m *SigningMethodHMAC) Sign(signingString string, key any) ([]byte, error) {
if keyBytes, ok := key.([]byte); ok {
if !m.Hash.Available() {
return nil, ErrHashUnavailable

View File

@@ -5,9 +5,9 @@ import (
"fmt"
)
// MapClaims is a claims type that uses the map[string]interface{} for JSON
// MapClaims is a claims type that uses the map[string]any for JSON
// decoding. This is the default claims type if you don't supply one
type MapClaims map[string]interface{}
type MapClaims map[string]any
// GetExpirationTime implements the Claims interface.
func (m MapClaims) GetExpirationTime() (*NumericDate, error) {
@@ -73,7 +73,7 @@ func (m MapClaims) parseClaimsString(key string) (ClaimStrings, error) {
cs = append(cs, v)
case []string:
cs = v
case []interface{}:
case []any:
for _, a := range v {
vs, ok := a.(string)
if !ok {
@@ -92,7 +92,7 @@ func (m MapClaims) parseClaimsString(key string) (ClaimStrings, error) {
func (m MapClaims) parseString(key string) (string, error) {
var (
ok bool
raw interface{}
raw any
iss string
)
raw, ok = m[key]

View File

@@ -25,7 +25,7 @@ func (m *signingMethodNone) Alg() string {
}
// Only allow 'none' alg type if UnsafeAllowNoneSignatureType is specified as the key
func (m *signingMethodNone) Verify(signingString string, sig []byte, key interface{}) (err error) {
func (m *signingMethodNone) Verify(signingString string, sig []byte, key any) (err error) {
// Key must be UnsafeAllowNoneSignatureType to prevent accidentally
// accepting 'none' signing method
if _, ok := key.(unsafeNoneMagicConstant); !ok {
@@ -41,7 +41,7 @@ func (m *signingMethodNone) Verify(signingString string, sig []byte, key interfa
}
// Only allow 'none' signing if UnsafeAllowNoneSignatureType is specified as the key
func (m *signingMethodNone) Sign(signingString string, key interface{}) ([]byte, error) {
func (m *signingMethodNone) Sign(signingString string, key any) ([]byte, error) {
if _, ok := key.(unsafeNoneMagicConstant); ok {
return []byte{}, nil
}

View File

@@ -66,20 +66,37 @@ func WithExpirationRequired() ParserOption {
}
}
// WithAudience configures the validator to require the specified audience in
// the `aud` claim. Validation will fail if the audience is not listed in the
// token or the `aud` claim is missing.
// WithAudience configures the validator to require any of the specified
// audiences in the `aud` claim. Validation will fail if the audience is not
// listed in the token or the `aud` claim is missing.
//
// NOTE: While the `aud` claim is OPTIONAL in a JWT, the handling of it is
// application-specific. Since this validation API is helping developers in
// writing secure application, we decided to REQUIRE the existence of the claim,
// if an audience is expected.
func WithAudience(aud string) ParserOption {
func WithAudience(aud ...string) ParserOption {
return func(p *Parser) {
p.validator.expectedAud = aud
}
}
// WithAllAudiences configures the validator to require all the specified
// audiences in the `aud` claim. Validation will fail if the specified audiences
// are not listed in the token or the `aud` claim is missing. Duplicates within
// the list are de-duplicated since internally, we use a map to look up the
// audiences.
//
// NOTE: While the `aud` claim is OPTIONAL in a JWT, the handling of it is
// application-specific. Since this validation API is helping developers in
// writing secure application, we decided to REQUIRE the existence of the claim,
// if an audience is expected.
func WithAllAudiences(aud ...string) ParserOption {
return func(p *Parser) {
p.validator.expectedAud = aud
p.validator.expectAllAud = true
}
}
// WithIssuer configures the validator to require the specified issuer in the
// `iss` claim. Validation will fail if a different issuer is specified in the
// token or the `iss` claim is missing.

View File

@@ -46,7 +46,7 @@ func (m *SigningMethodRSA) Alg() string {
// Verify implements token verification for the SigningMethod
// For this signing method, must be an *rsa.PublicKey structure.
func (m *SigningMethodRSA) Verify(signingString string, sig []byte, key interface{}) error {
func (m *SigningMethodRSA) Verify(signingString string, sig []byte, key any) error {
var rsaKey *rsa.PublicKey
var ok bool
@@ -67,7 +67,7 @@ func (m *SigningMethodRSA) Verify(signingString string, sig []byte, key interfac
// Sign implements token signing for the SigningMethod
// For this signing method, must be an *rsa.PrivateKey structure.
func (m *SigningMethodRSA) Sign(signingString string, key interface{}) ([]byte, error) {
func (m *SigningMethodRSA) Sign(signingString string, key any) ([]byte, error) {
var rsaKey *rsa.PrivateKey
var ok bool

View File

@@ -82,7 +82,7 @@ func init() {
// Verify implements token verification for the SigningMethod.
// For this verify method, key must be an rsa.PublicKey struct
func (m *SigningMethodRSAPSS) Verify(signingString string, sig []byte, key interface{}) error {
func (m *SigningMethodRSAPSS) Verify(signingString string, sig []byte, key any) error {
var rsaKey *rsa.PublicKey
switch k := key.(type) {
case *rsa.PublicKey:
@@ -108,7 +108,7 @@ func (m *SigningMethodRSAPSS) Verify(signingString string, sig []byte, key inter
// Sign implements token signing for the SigningMethod.
// For this signing method, key must be an rsa.PrivateKey struct
func (m *SigningMethodRSAPSS) Sign(signingString string, key interface{}) ([]byte, error) {
func (m *SigningMethodRSAPSS) Sign(signingString string, key any) ([]byte, error) {
var rsaKey *rsa.PrivateKey
switch k := key.(type) {

View File

@@ -23,7 +23,7 @@ func ParseRSAPrivateKeyFromPEM(key []byte) (*rsa.PrivateKey, error) {
return nil, ErrKeyMustBePEMEncoded
}
var parsedKey interface{}
var parsedKey any
if parsedKey, err = x509.ParsePKCS1PrivateKey(block.Bytes); err != nil {
if parsedKey, err = x509.ParsePKCS8PrivateKey(block.Bytes); err != nil {
return nil, err
@@ -53,7 +53,7 @@ func ParseRSAPrivateKeyFromPEMWithPassword(key []byte, password string) (*rsa.Pr
return nil, ErrKeyMustBePEMEncoded
}
var parsedKey interface{}
var parsedKey any
var blockDecrypted []byte
if blockDecrypted, err = x509.DecryptPEMBlock(block, []byte(password)); err != nil {
@@ -86,7 +86,7 @@ func ParseRSAPublicKeyFromPEM(key []byte) (*rsa.PublicKey, error) {
}
// Parse the key
var parsedKey interface{}
var parsedKey any
if parsedKey, err = x509.ParsePKIXPublicKey(block.Bytes); err != nil {
if cert, err := x509.ParseCertificate(block.Bytes); err == nil {
parsedKey = cert.PublicKey

View File

@@ -12,9 +12,9 @@ var signingMethodLock = new(sync.RWMutex)
// signature in Sign. The signature is then usually base64 encoded as part of a
// JWT.
type SigningMethod interface {
Verify(signingString string, sig []byte, key interface{}) error // Returns nil if signature is valid
Sign(signingString string, key interface{}) ([]byte, error) // Returns signature or error
Alg() string // returns the alg identifier for this method (example: 'HS256')
Verify(signingString string, sig []byte, key any) error // Returns nil if signature is valid
Sign(signingString string, key any) ([]byte, error) // Returns signature or error
Alg() string // returns the alg identifier for this method (example: 'HS256')
}
// RegisterSigningMethod registers the "alg" name and a factory function for signing method.

View File

@@ -11,9 +11,9 @@ import (
// Token. This allows you to use properties in the Header of the token (such as
// `kid`) to identify which key to use.
//
// The returned interface{} may be a single key or a VerificationKeySet containing
// The returned any may be a single key or a VerificationKeySet containing
// multiple keys.
type Keyfunc func(*Token) (interface{}, error)
type Keyfunc func(*Token) (any, error)
// VerificationKey represents a public or secret key for verifying a token's signature.
type VerificationKey interface {
@@ -28,12 +28,12 @@ type VerificationKeySet struct {
// Token represents a JWT Token. Different fields will be used depending on
// whether you're creating or parsing/verifying a token.
type Token struct {
Raw string // Raw contains the raw token. Populated when you [Parse] a token
Method SigningMethod // Method is the signing method used or to be used
Header map[string]interface{} // Header is the first segment of the token in decoded form
Claims Claims // Claims is the second segment of the token in decoded form
Signature []byte // Signature is the third segment of the token in decoded form. Populated when you Parse a token
Valid bool // Valid specifies if the token is valid. Populated when you Parse/Verify a token
Raw string // Raw contains the raw token. Populated when you [Parse] a token
Method SigningMethod // Method is the signing method used or to be used
Header map[string]any // Header is the first segment of the token in decoded form
Claims Claims // Claims is the second segment of the token in decoded form
Signature []byte // Signature is the third segment of the token in decoded form. Populated when you Parse a token
Valid bool // Valid specifies if the token is valid. Populated when you Parse/Verify a token
}
// New creates a new [Token] with the specified signing method and an empty map
@@ -46,7 +46,7 @@ func New(method SigningMethod, opts ...TokenOption) *Token {
// claims. Additional options can be specified, but are currently unused.
func NewWithClaims(method SigningMethod, claims Claims, opts ...TokenOption) *Token {
return &Token{
Header: map[string]interface{}{
Header: map[string]any{
"typ": "JWT",
"alg": method.Alg(),
},
@@ -60,7 +60,7 @@ func NewWithClaims(method SigningMethod, claims Claims, opts ...TokenOption) *To
// https://golang-jwt.github.io/jwt/usage/signing_methods/#signing-methods-and-key-types
// for an overview of the different signing methods and their respective key
// types.
func (t *Token) SignedString(key interface{}) (string, error) {
func (t *Token) SignedString(key any) (string, error) {
sstr, err := t.SigningString()
if err != nil {
return "", err

View File

@@ -103,7 +103,7 @@ func (date *NumericDate) UnmarshalJSON(b []byte) (err error) {
type ClaimStrings []string
func (s *ClaimStrings) UnmarshalJSON(data []byte) (err error) {
var value interface{}
var value any
if err = json.Unmarshal(data, &value); err != nil {
return err
@@ -116,7 +116,7 @@ func (s *ClaimStrings) UnmarshalJSON(data []byte) (err error) {
aud = append(aud, v)
case []string:
aud = ClaimStrings(v)
case []interface{}:
case []any:
for _, vv := range v {
vs, ok := vv.(string)
if !ok {

View File

@@ -1,8 +1,8 @@
package jwt
import (
"crypto/subtle"
"fmt"
"slices"
"time"
)
@@ -52,8 +52,12 @@ type Validator struct {
verifyIat bool
// expectedAud contains the audience this token expects. Supplying an empty
// string will disable aud checking.
expectedAud string
// slice will disable aud checking.
expectedAud []string
// expectAllAud specifies whether all expected audiences must be present in
// the token. If false, only one of the expected audiences must be present.
expectAllAud bool
// expectedIss contains the issuer this token expects. Supplying an empty
// string will disable iss checking.
@@ -88,7 +92,7 @@ func NewValidator(opts ...ParserOption) *Validator {
func (v *Validator) Validate(claims Claims) error {
var (
now time.Time
errs []error = make([]error, 0, 6)
errs = make([]error, 0, 6)
err error
)
@@ -120,8 +124,8 @@ func (v *Validator) Validate(claims Claims) error {
}
// If we have an expected audience, we also require the audience claim
if v.expectedAud != "" {
if err = v.verifyAudience(claims, v.expectedAud, true); err != nil {
if len(v.expectedAud) > 0 {
if err = v.verifyAudience(claims, v.expectedAud, v.expectAllAud); err != nil {
errs = append(errs, err)
}
}
@@ -226,33 +230,39 @@ func (v *Validator) verifyNotBefore(claims Claims, cmp time.Time, required bool)
//
// Additionally, if any error occurs while retrieving the claim, e.g., when its
// the wrong type, an ErrTokenUnverifiable error will be returned.
func (v *Validator) verifyAudience(claims Claims, cmp string, required bool) error {
func (v *Validator) verifyAudience(claims Claims, cmp []string, expectAllAud bool) error {
aud, err := claims.GetAudience()
if err != nil {
return err
}
if len(aud) == 0 {
// Check that aud exists and is not empty. We only require the aud claim
// if we expect at least one audience to be present.
if len(aud) == 0 || len(aud) == 1 && aud[0] == "" {
required := len(v.expectedAud) > 0
return errorIfRequired(required, "aud")
}
// use a var here to keep constant time compare when looping over a number of claims
result := false
var stringClaims string
for _, a := range aud {
if subtle.ConstantTimeCompare([]byte(a), []byte(cmp)) != 0 {
result = true
if !expectAllAud {
for _, a := range aud {
// If we only expect one match, we can stop early if we find a match
if slices.Contains(cmp, a) {
return nil
}
}
stringClaims = stringClaims + a
return ErrTokenInvalidAudience
}
// case where "" is sent in one or many aud claims
if stringClaims == "" {
return errorIfRequired(required, "aud")
// Note that we are looping cmp here to ensure that all expected audiences
// are present in the aud claim.
for _, a := range cmp {
if !slices.Contains(aud, a) {
return ErrTokenInvalidAudience
}
}
return errorIfFalse(result, ErrTokenInvalidAudience)
return nil
}
// verifyIssuer compares the iss claim in claims against cmp.

View File

@@ -130,6 +130,7 @@ type commonLoggingResponseWriter interface {
http.Flusher
Status() int
Size() int
Unwrap() http.ResponseWriter
}
// responseLogger is wrapper of http.ResponseWriter that keeps track of its HTTP
@@ -170,6 +171,10 @@ func (l *responseLogger) Flush() {
}
}
func (l *responseLogger) Unwrap() http.ResponseWriter {
return l.w
}
type hijackLogger struct {
responseLogger
}

View File

@@ -21,7 +21,9 @@
package mocks
import (
jetstream "github.com/nats-io/nats.go/jetstream"
events "github.com/opencloud-eu/reva/v2/pkg/events"
mock "github.com/stretchr/testify/mock"
raw "github.com/opencloud-eu/reva/v2/pkg/events/raw"
@@ -112,6 +114,53 @@ func (_c *Stream_Consume_Call) RunAndReturn(run func(string, ...events.Unmarshal
return _c
}
// JetStream provides a mock function with no fields
func (_m *Stream) JetStream() jetstream.Stream {
ret := _m.Called()
if len(ret) == 0 {
panic("no return value specified for JetStream")
}
var r0 jetstream.Stream
if rf, ok := ret.Get(0).(func() jetstream.Stream); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(jetstream.Stream)
}
}
return r0
}
// Stream_JetStream_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'JetStream'
type Stream_JetStream_Call struct {
*mock.Call
}
// JetStream is a helper method to define mock.On call
func (_e *Stream_Expecter) JetStream() *Stream_JetStream_Call {
return &Stream_JetStream_Call{Call: _e.mock.On("JetStream")}
}
func (_c *Stream_JetStream_Call) Run(run func()) *Stream_JetStream_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *Stream_JetStream_Call) Return(_a0 jetstream.Stream) *Stream_JetStream_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *Stream_JetStream_Call) RunAndReturn(run func() jetstream.Stream) *Stream_JetStream_Call {
_c.Call.Return(run)
return _c
}
// NewStream creates a new instance of Stream. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewStream(t interface {

View File

@@ -62,10 +62,11 @@ func (re *Event) InProgress() error {
type Stream interface {
Consume(group string, evs ...events.Unmarshaller) (<-chan Event, error)
JetStream() jetstream.Stream
}
type RawStream struct {
Js jetstream.Stream
js jetstream.Stream
c Config
}
@@ -130,7 +131,7 @@ func FromConfig(ctx context.Context, name string, cfg Config) (Stream, error) {
}
s = &RawStream{
Js: js,
js: js,
c: cfg,
}
return nil
@@ -186,7 +187,7 @@ func (s *RawStream) Consume(group string, evs ...events.Unmarshaller) (<-chan Ev
}
func (s *RawStream) consumeRaw(group string) (<-chan RawEvent, error) {
consumer, err := s.Js.CreateOrUpdateConsumer(context.Background(), jetstream.ConsumerConfig{
consumer, err := s.js.CreateOrUpdateConsumer(context.Background(), jetstream.ConsumerConfig{
Durable: group,
DeliverPolicy: jetstream.DeliverNewPolicy,
AckPolicy: jetstream.AckExplicitPolicy, // Require manual acknowledgment
@@ -214,3 +215,7 @@ func (s *RawStream) consumeRaw(group string) (<-chan RawEvent, error) {
return channel, nil
}
func (s *RawStream) JetStream() jetstream.Stream {
return s.js
}

View File

@@ -360,6 +360,10 @@ type Balancer interface {
// call SubConn.Shutdown for its existing SubConns; however, this will be
// required in a future release, so it is recommended.
Close()
// ExitIdle instructs the LB policy to reconnect to backends / exit the
// IDLE state, if appropriate and possible. Note that SubConns that enter
// the IDLE state will not reconnect until SubConn.Connect is called.
ExitIdle()
}
// ExitIdler is an optional interface for balancers to implement. If
@@ -367,8 +371,8 @@ type Balancer interface {
// the ClientConn is idle. If unimplemented, ClientConn.Connect will cause
// all SubConns to connect.
//
// Notice: it will be required for all balancers to implement this in a future
// release.
// Deprecated: All balancers must implement this interface. This interface will
// be removed in a future release.
type ExitIdler interface {
// ExitIdle instructs the LB policy to reconnect to backends / exit the
// IDLE state, if appropriate and possible. Note that SubConns that enter

View File

@@ -45,7 +45,15 @@ type ChildState struct {
// Balancer exposes only the ExitIdler interface of the child LB policy.
// Other methods of the child policy are called only by endpointsharding.
Balancer balancer.ExitIdler
Balancer ExitIdler
}
// ExitIdler provides access to only the ExitIdle method of the child balancer.
type ExitIdler interface {
// ExitIdle instructs the LB policy to reconnect to backends / exit the
// IDLE state, if appropriate and possible. Note that SubConns that enter
// the IDLE state will not reconnect until SubConn.Connect is called.
ExitIdle()
}
// Options are the options to configure the behaviour of the
@@ -205,6 +213,16 @@ func (es *endpointSharding) Close() {
}
}
func (es *endpointSharding) ExitIdle() {
es.childMu.Lock()
defer es.childMu.Unlock()
for _, bw := range es.children.Load().Values() {
if !bw.isClosed {
bw.child.ExitIdle()
}
}
}
// updateState updates this component's state. It sends the aggregated state,
// and a picker with round robin behavior with all the child states present if
// needed.
@@ -326,15 +344,13 @@ func (bw *balancerWrapper) UpdateState(state balancer.State) {
// ExitIdle pings an IDLE child balancer to exit idle in a new goroutine to
// avoid deadlocks due to synchronous balancer state updates.
func (bw *balancerWrapper) ExitIdle() {
if ei, ok := bw.child.(balancer.ExitIdler); ok {
go func() {
bw.es.childMu.Lock()
if !bw.isClosed {
ei.ExitIdle()
}
bw.es.childMu.Unlock()
}()
}
go func() {
bw.es.childMu.Lock()
if !bw.isClosed {
bw.child.ExitIdle()
}
bw.es.childMu.Unlock()
}()
}
// updateClientConnStateLocked delivers the ClientConnState to the child

View File

@@ -54,18 +54,9 @@ func init() {
balancer.Register(pickfirstBuilder{})
}
type (
// enableHealthListenerKeyType is a unique key type used in resolver
// attributes to indicate whether the health listener usage is enabled.
enableHealthListenerKeyType struct{}
// managedByPickfirstKeyType is an attribute key type to inform Outlier
// Detection that the generic health listener is being used.
// TODO: https://github.com/grpc/grpc-go/issues/7915 - Remove this when
// implementing the dualstack design. This is a hack. Once Dualstack is
// completed, outlier detection will stop sending ejection updates through
// the connectivity listener.
managedByPickfirstKeyType struct{}
)
// enableHealthListenerKeyType is a unique key type used in resolver
// attributes to indicate whether the health listener usage is enabled.
type enableHealthListenerKeyType struct{}
var (
logger = grpclog.Component("pick-first-leaf-lb")
@@ -149,17 +140,6 @@ func EnableHealthListener(state resolver.State) resolver.State {
return state
}
// IsManagedByPickfirst returns whether an address belongs to a SubConn
// managed by the pickfirst LB policy.
// TODO: https://github.com/grpc/grpc-go/issues/7915 - This is a hack to disable
// outlier_detection via the with connectivity listener when using pick_first.
// Once Dualstack changes are complete, all SubConns will be created by
// pick_first and outlier detection will only use the health listener for
// ejection. This hack can then be removed.
func IsManagedByPickfirst(addr resolver.Address) bool {
return addr.BalancerAttributes.Value(managedByPickfirstKeyType{}) != nil
}
type pfConfig struct {
serviceconfig.LoadBalancingConfig `json:"-"`
@@ -186,7 +166,6 @@ type scData struct {
}
func (b *pickfirstBalancer) newSCData(addr resolver.Address) (*scData, error) {
addr.BalancerAttributes = addr.BalancerAttributes.WithValue(managedByPickfirstKeyType{}, true)
sd := &scData{
rawConnectivityState: connectivity.Idle,
effectiveState: connectivity.Idle,

View File

@@ -70,10 +70,3 @@ func (b *rrBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error {
ResolverState: pickfirstleaf.EnableHealthListener(ccs.ResolverState),
})
}
func (b *rrBalancer) ExitIdle() {
// Should always be ok, as child is endpoint sharding.
if ei, ok := b.Balancer.(balancer.ExitIdler); ok {
ei.ExitIdle()
}
}

View File

@@ -213,6 +213,7 @@ func WithReadBufferSize(s int) DialOption {
func WithInitialWindowSize(s int32) DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.copts.InitialWindowSize = s
o.copts.StaticWindowSize = true
})
}
@@ -222,6 +223,26 @@ func WithInitialWindowSize(s int32) DialOption {
func WithInitialConnWindowSize(s int32) DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.copts.InitialConnWindowSize = s
o.copts.StaticWindowSize = true
})
}
// WithStaticStreamWindowSize returns a DialOption which sets the initial
// stream window size to the value provided and disables dynamic flow control.
func WithStaticStreamWindowSize(s int32) DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.copts.InitialWindowSize = s
o.copts.StaticWindowSize = true
})
}
// WithStaticConnWindowSize returns a DialOption which sets the initial
// connection window size to the value provided and disables dynamic flow
// control.
func WithStaticConnWindowSize(s int32) DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.copts.InitialConnWindowSize = s
o.copts.StaticWindowSize = true
})
}

View File

@@ -188,13 +188,13 @@ type HealthServer interface {
type UnimplementedHealthServer struct{}
func (UnimplementedHealthServer) Check(context.Context, *HealthCheckRequest) (*HealthCheckResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method Check not implemented")
return nil, status.Error(codes.Unimplemented, "method Check not implemented")
}
func (UnimplementedHealthServer) List(context.Context, *HealthListRequest) (*HealthListResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method List not implemented")
return nil, status.Error(codes.Unimplemented, "method List not implemented")
}
func (UnimplementedHealthServer) Watch(*HealthCheckRequest, grpc.ServerStreamingServer[HealthCheckResponse]) error {
return status.Errorf(codes.Unimplemented, "method Watch not implemented")
return status.Error(codes.Unimplemented, "method Watch not implemented")
}
func (UnimplementedHealthServer) testEmbeddedByValue() {}

View File

@@ -223,15 +223,7 @@ func (gsb *Balancer) ExitIdle() {
// There is no need to protect this read with a mutex, as the write to the
// Balancer field happens in SwitchTo, which completes before this can be
// called.
if ei, ok := balToUpdate.Balancer.(balancer.ExitIdler); ok {
ei.ExitIdle()
return
}
gsb.mu.Lock()
defer gsb.mu.Unlock()
for sc := range balToUpdate.subconns {
sc.Connect()
}
balToUpdate.ExitIdle()
}
// updateSubConnState forwards the update to the appropriate child.

View File

@@ -33,10 +33,6 @@ var (
// "GRPC_RING_HASH_CAP". This does not override the default bounds
// checking which NACKs configs specifying ring sizes > 8*1024*1024 (~8M).
RingHashCap = uint64FromEnv("GRPC_RING_HASH_CAP", 4096, 1, 8*1024*1024)
// LeastRequestLB is set if we should support the least_request_experimental
// LB policy, which can be enabled by setting the environment variable
// "GRPC_EXPERIMENTAL_ENABLE_LEAST_REQUEST" to "true".
LeastRequestLB = boolFromEnv("GRPC_EXPERIMENTAL_ENABLE_LEAST_REQUEST", true)
// ALTSMaxConcurrentHandshakes is the maximum number of concurrent ALTS
// handshakes that can be performed.
ALTSMaxConcurrentHandshakes = uint64FromEnv("GRPC_ALTS_MAX_CONCURRENT_HANDSHAKES", 100, 1, 100)

View File

@@ -40,6 +40,13 @@ var updateHeaderTblSize = func(e *hpack.Encoder, v uint32) {
e.SetMaxDynamicTableSizeLimit(v)
}
// itemNodePool is used to reduce heap allocations.
var itemNodePool = sync.Pool{
New: func() any {
return &itemNode{}
},
}
type itemNode struct {
it any
next *itemNode
@@ -51,7 +58,9 @@ type itemList struct {
}
func (il *itemList) enqueue(i any) {
n := &itemNode{it: i}
n := itemNodePool.Get().(*itemNode)
n.next = nil
n.it = i
if il.tail == nil {
il.head, il.tail = n, n
return
@@ -71,7 +80,9 @@ func (il *itemList) dequeue() any {
return nil
}
i := il.head.it
temp := il.head
il.head = il.head.next
itemNodePool.Put(temp)
if il.head == nil {
il.tail = nil
}
@@ -146,10 +157,11 @@ type earlyAbortStream struct {
func (*earlyAbortStream) isTransportResponseFrame() bool { return false }
type dataFrame struct {
streamID uint32
endStream bool
h []byte
reader mem.Reader
streamID uint32
endStream bool
h []byte
data mem.BufferSlice
processing bool
// onEachWrite is called every time
// a part of data is written out.
onEachWrite func()
@@ -234,6 +246,7 @@ type outStream struct {
itl *itemList
bytesOutStanding int
wq *writeQuota
reader mem.Reader
next *outStream
prev *outStream
@@ -461,7 +474,9 @@ func (c *controlBuffer) finish() {
v.onOrphaned(ErrConnClosing)
}
case *dataFrame:
_ = v.reader.Close()
if !v.processing {
v.data.Free()
}
}
}
@@ -650,10 +665,11 @@ func (l *loopyWriter) incomingSettingsHandler(s *incomingSettings) error {
func (l *loopyWriter) registerStreamHandler(h *registerStream) {
str := &outStream{
id: h.streamID,
state: empty,
itl: &itemList{},
wq: h.wq,
id: h.streamID,
state: empty,
itl: &itemList{},
wq: h.wq,
reader: mem.BufferSlice{}.Reader(),
}
l.estdStreams[h.streamID] = str
}
@@ -685,10 +701,11 @@ func (l *loopyWriter) headerHandler(h *headerFrame) error {
}
// Case 2: Client wants to originate stream.
str := &outStream{
id: h.streamID,
state: empty,
itl: &itemList{},
wq: h.wq,
id: h.streamID,
state: empty,
itl: &itemList{},
wq: h.wq,
reader: mem.BufferSlice{}.Reader(),
}
return l.originateStream(str, h)
}
@@ -790,10 +807,13 @@ func (l *loopyWriter) cleanupStreamHandler(c *cleanupStream) error {
// a RST_STREAM before stream initialization thus the stream might
// not be established yet.
delete(l.estdStreams, c.streamID)
str.reader.Close()
str.deleteSelf()
for head := str.itl.dequeueAll(); head != nil; head = head.next {
if df, ok := head.it.(*dataFrame); ok {
_ = df.reader.Close()
if !df.processing {
df.data.Free()
}
}
}
}
@@ -928,7 +948,13 @@ func (l *loopyWriter) processData() (bool, error) {
if str == nil {
return true, nil
}
reader := str.reader
dataItem := str.itl.peek().(*dataFrame) // Peek at the first data item this stream.
if !dataItem.processing {
dataItem.processing = true
str.reader.Reset(dataItem.data)
dataItem.data.Free()
}
// A data item is represented by a dataFrame, since it later translates into
// multiple HTTP2 data frames.
// Every dataFrame has two buffers; h that keeps grpc-message header and data
@@ -936,13 +962,13 @@ func (l *loopyWriter) processData() (bool, error) {
// from data is copied to h to make as big as the maximum possible HTTP2 frame
// size.
if len(dataItem.h) == 0 && dataItem.reader.Remaining() == 0 { // Empty data frame
if len(dataItem.h) == 0 && reader.Remaining() == 0 { // Empty data frame
// Client sends out empty data frame with endStream = true
if err := l.framer.fr.WriteData(dataItem.streamID, dataItem.endStream, nil); err != nil {
return false, err
}
str.itl.dequeue() // remove the empty data item from stream
_ = dataItem.reader.Close()
_ = reader.Close()
if str.itl.isEmpty() {
str.state = empty
} else if trailer, ok := str.itl.peek().(*headerFrame); ok { // the next item is trailers.
@@ -971,8 +997,8 @@ func (l *loopyWriter) processData() (bool, error) {
}
// Compute how much of the header and data we can send within quota and max frame length
hSize := min(maxSize, len(dataItem.h))
dSize := min(maxSize-hSize, dataItem.reader.Remaining())
remainingBytes := len(dataItem.h) + dataItem.reader.Remaining() - hSize - dSize
dSize := min(maxSize-hSize, reader.Remaining())
remainingBytes := len(dataItem.h) + reader.Remaining() - hSize - dSize
size := hSize + dSize
var buf *[]byte
@@ -993,7 +1019,7 @@ func (l *loopyWriter) processData() (bool, error) {
defer pool.Put(buf)
copy((*buf)[:hSize], dataItem.h)
_, _ = dataItem.reader.Read((*buf)[hSize:])
_, _ = reader.Read((*buf)[hSize:])
}
// Now that outgoing flow controls are checked we can replenish str's write quota
@@ -1014,7 +1040,7 @@ func (l *loopyWriter) processData() (bool, error) {
dataItem.h = dataItem.h[hSize:]
if remainingBytes == 0 { // All the data from that message was written out.
_ = dataItem.reader.Close()
_ = reader.Close()
str.itl.dequeue()
}
if str.itl.isEmpty() {

View File

@@ -309,11 +309,9 @@ func NewHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
scheme = "https"
}
}
dynamicWindow := true
icwz := int32(initialWindowSize)
if opts.InitialConnWindowSize >= defaultWindowSize {
icwz = opts.InitialConnWindowSize
dynamicWindow = false
}
writeBufSize := opts.WriteBufferSize
readBufSize := opts.ReadBufferSize
@@ -381,9 +379,8 @@ func NewHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
t.controlBuf = newControlBuffer(t.ctxDone)
if opts.InitialWindowSize >= defaultWindowSize {
t.initialWindowSize = opts.InitialWindowSize
dynamicWindow = false
}
if dynamicWindow {
if !opts.StaticWindowSize {
t.bdpEst = &bdpEstimator{
bdp: initialWindowSize,
updateFlowControl: t.updateFlowControl,
@@ -1091,32 +1088,29 @@ func (t *http2Client) GracefulClose() {
// Write formats the data into HTTP2 data frame(s) and sends it out. The caller
// should proceed only if Write returns nil.
func (t *http2Client) write(s *ClientStream, hdr []byte, data mem.BufferSlice, opts *WriteOptions) error {
reader := data.Reader()
if opts.Last {
// If it's the last message, update stream state.
if !s.compareAndSwapState(streamActive, streamWriteDone) {
_ = reader.Close()
return errStreamDone
}
} else if s.getState() != streamActive {
_ = reader.Close()
return errStreamDone
}
df := &dataFrame{
streamID: s.id,
endStream: opts.Last,
h: hdr,
reader: reader,
data: data,
}
if hdr != nil || df.reader.Remaining() != 0 { // If it's not an empty data frame, check quota.
if err := s.wq.get(int32(len(hdr) + df.reader.Remaining())); err != nil {
_ = reader.Close()
dataLen := data.Len()
if hdr != nil || dataLen != 0 { // If it's not an empty data frame, check quota.
if err := s.wq.get(int32(len(hdr) + dataLen)); err != nil {
return err
}
}
data.Ref()
if err := t.controlBuf.put(df); err != nil {
_ = reader.Close()
data.Free()
return err
}
t.incrMsgSent()

View File

@@ -132,6 +132,10 @@ type http2Server struct {
maxStreamID uint32 // max stream ID ever seen
logger *grpclog.PrefixLogger
// setResetPingStrikes is stored as a closure instead of making this a
// method on http2Server to avoid a heap allocation when converting a method
// to a closure for passing to frames objects.
setResetPingStrikes func()
}
// NewServerTransport creates a http2 transport with conn and configuration
@@ -176,16 +180,13 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
Val: config.MaxStreams,
})
}
dynamicWindow := true
iwz := int32(initialWindowSize)
if config.InitialWindowSize >= defaultWindowSize {
iwz = config.InitialWindowSize
dynamicWindow = false
}
icwz := int32(initialWindowSize)
if config.InitialConnWindowSize >= defaultWindowSize {
icwz = config.InitialConnWindowSize
dynamicWindow = false
}
if iwz != defaultWindowSize {
isettings = append(isettings, http2.Setting{
@@ -266,6 +267,9 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
initialWindowSize: iwz,
bufferPool: config.BufferPool,
}
t.setResetPingStrikes = func() {
atomic.StoreUint32(&t.resetPingStrikes, 1)
}
var czSecurity credentials.ChannelzSecurityValue
if au, ok := authInfo.(credentials.ChannelzSecurityInfo); ok {
czSecurity = au.GetSecurityValue()
@@ -285,7 +289,7 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
t.logger = prefixLoggerForServerTransport(t)
t.controlBuf = newControlBuffer(t.done)
if dynamicWindow {
if !config.StaticWindowSize {
t.bdpEst = &bdpEstimator{
bdp: initialWindowSize,
updateFlowControl: t.updateFlowControl,
@@ -596,10 +600,25 @@ func (t *http2Server) operateHeaders(ctx context.Context, frame *http2.MetaHeade
return nil
}
}
if s.ctx.Err() != nil {
t.mu.Unlock()
// Early abort in case the timeout was zero or so low it already fired.
t.controlBuf.put(&earlyAbortStream{
httpStatus: http.StatusOK,
streamID: s.id,
contentSubtype: s.contentSubtype,
status: status.New(codes.DeadlineExceeded, context.DeadlineExceeded.Error()),
rst: !frame.StreamEnded(),
})
return nil
}
t.activeStreams[streamID] = s
if len(t.activeStreams) == 1 {
t.idle = time.Time{}
}
// Start a timer to close the stream on reaching the deadline.
if timeoutSet {
// We need to wait for s.cancel to be updated before calling
@@ -1016,10 +1035,6 @@ func (t *http2Server) writeHeader(s *ServerStream, md metadata.MD) error {
return nil
}
func (t *http2Server) setResetPingStrikes() {
atomic.StoreUint32(&t.resetPingStrikes, 1)
}
func (t *http2Server) writeHeaderLocked(s *ServerStream) error {
// TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
// first and create a slice of that exact size.
@@ -1132,17 +1147,13 @@ func (t *http2Server) writeStatus(s *ServerStream, st *status.Status) error {
// Write converts the data into HTTP2 data frame and sends it out. Non-nil error
// is returns if it fails (e.g., framing error, transport error).
func (t *http2Server) write(s *ServerStream, hdr []byte, data mem.BufferSlice, _ *WriteOptions) error {
reader := data.Reader()
if !s.isHeaderSent() { // Headers haven't been written yet.
if err := t.writeHeader(s, nil); err != nil {
_ = reader.Close()
return err
}
} else {
// Writing headers checks for this condition.
if s.getState() == streamDone {
_ = reader.Close()
return t.streamContextErr(s)
}
}
@@ -1150,15 +1161,16 @@ func (t *http2Server) write(s *ServerStream, hdr []byte, data mem.BufferSlice, _
df := &dataFrame{
streamID: s.id,
h: hdr,
reader: reader,
data: data,
onEachWrite: t.setResetPingStrikes,
}
if err := s.wq.get(int32(len(hdr) + df.reader.Remaining())); err != nil {
_ = reader.Close()
dataLen := data.Len()
if err := s.wq.get(int32(len(hdr) + dataLen)); err != nil {
return t.streamContextErr(s)
}
data.Ref()
if err := t.controlBuf.put(df); err != nil {
_ = reader.Close()
data.Free()
return err
}
t.incrMsgSent()

View File

@@ -200,9 +200,6 @@ func decodeTimeout(s string) (time.Duration, error) {
if err != nil {
return 0, err
}
if t == 0 {
return 0, fmt.Errorf("transport: timeout must be positive: %q", s)
}
const maxHours = math.MaxInt64 / uint64(time.Hour)
if d == time.Hour && t > maxHours {
// This timeout would overflow math.MaxInt64; clamp it.

View File

@@ -466,6 +466,7 @@ type ServerConfig struct {
MaxHeaderListSize *uint32
HeaderTableSize *uint32
BufferPool mem.BufferPool
StaticWindowSize bool
}
// ConnectOptions covers all relevant options for communicating with the server.
@@ -504,6 +505,8 @@ type ConnectOptions struct {
MaxHeaderListSize *uint32
// The mem.BufferPool to use when reading/writing to the wire.
BufferPool mem.BufferPool
// StaticWindowSize controls whether dynamic window sizing is enabled.
StaticWindowSize bool
}
// WriteOptions provides additional hints and information for message

View File

@@ -137,6 +137,9 @@ type Reader interface {
Close() error
// Remaining returns the number of unread bytes remaining in the slice.
Remaining() int
// Reset frees the currently held buffer slice and starts reading from the
// provided slice. This allows reusing the reader object.
Reset(s BufferSlice)
}
type sliceReader struct {
@@ -150,6 +153,14 @@ func (r *sliceReader) Remaining() int {
return r.len
}
func (r *sliceReader) Reset(s BufferSlice) {
r.data.Free()
s.Ref()
r.data = s
r.len = s.Len()
r.bufferIdx = 0
}
func (r *sliceReader) Close() error {
r.data.Free()
r.data = nil

View File

@@ -90,7 +90,7 @@ type ServerReflectionServer interface {
type UnimplementedServerReflectionServer struct{}
func (UnimplementedServerReflectionServer) ServerReflectionInfo(grpc.BidiStreamingServer[ServerReflectionRequest, ServerReflectionResponse]) error {
return status.Errorf(codes.Unimplemented, "method ServerReflectionInfo not implemented")
return status.Error(codes.Unimplemented, "method ServerReflectionInfo not implemented")
}
func (UnimplementedServerReflectionServer) testEmbeddedByValue() {}

View File

@@ -87,7 +87,7 @@ type ServerReflectionServer interface {
type UnimplementedServerReflectionServer struct{}
func (UnimplementedServerReflectionServer) ServerReflectionInfo(grpc.BidiStreamingServer[ServerReflectionRequest, ServerReflectionResponse]) error {
return status.Errorf(codes.Unimplemented, "method ServerReflectionInfo not implemented")
return status.Error(codes.Unimplemented, "method ServerReflectionInfo not implemented")
}
func (UnimplementedServerReflectionServer) testEmbeddedByValue() {}

View File

@@ -179,6 +179,7 @@ type serverOptions struct {
numServerWorkers uint32
bufferPool mem.BufferPool
waitForHandlers bool
staticWindowSize bool
}
var defaultServerOptions = serverOptions{
@@ -279,6 +280,7 @@ func ReadBufferSize(s int) ServerOption {
func InitialWindowSize(s int32) ServerOption {
return newFuncServerOption(func(o *serverOptions) {
o.initialWindowSize = s
o.staticWindowSize = true
})
}
@@ -287,6 +289,29 @@ func InitialWindowSize(s int32) ServerOption {
func InitialConnWindowSize(s int32) ServerOption {
return newFuncServerOption(func(o *serverOptions) {
o.initialConnWindowSize = s
o.staticWindowSize = true
})
}
// StaticStreamWindowSize returns a ServerOption to set the initial stream
// window size to the value provided and disables dynamic flow control.
// The lower bound for window size is 64K and any value smaller than that
// will be ignored.
func StaticStreamWindowSize(s int32) ServerOption {
return newFuncServerOption(func(o *serverOptions) {
o.initialWindowSize = s
o.staticWindowSize = true
})
}
// StaticConnWindowSize returns a ServerOption to set the initial connection
// window size to the value provided and disables dynamic flow control.
// The lower bound for window size is 64K and any value smaller than that
// will be ignored.
func StaticConnWindowSize(s int32) ServerOption {
return newFuncServerOption(func(o *serverOptions) {
o.initialConnWindowSize = s
o.staticWindowSize = true
})
}
@@ -986,6 +1011,7 @@ func (s *Server) newHTTP2Transport(c net.Conn) transport.ServerTransport {
MaxHeaderListSize: s.opts.maxHeaderListSize,
HeaderTableSize: s.opts.headerTableSize,
BufferPool: s.opts.bufferPool,
StaticWindowSize: s.opts.staticWindowSize,
}
st, err := transport.NewServerTransport(c, config)
if err != nil {

View File

@@ -1171,7 +1171,7 @@ func (a *csAttempt) recvMsg(m any, payInfo *payloadInfo) (err error) {
} else if err != nil {
return toRPCErr(err)
}
return toRPCErr(errors.New("grpc: client streaming protocol violation: get <nil>, want <EOF>"))
return status.Errorf(codes.Internal, "cardinality violation: expected <EOF> for non server-streaming RPCs, but received another message")
}
func (a *csAttempt) finish(err error) {
@@ -1495,7 +1495,7 @@ func (as *addrConnStream) RecvMsg(m any) (err error) {
} else if err != nil {
return toRPCErr(err)
}
return toRPCErr(errors.New("grpc: client streaming protocol violation: get <nil>, want <EOF>"))
return status.Errorf(codes.Internal, "cardinality violation: expected <EOF> for non server-streaming RPCs, but received another message")
}
func (as *addrConnStream) finish(err error) {

View File

@@ -19,4 +19,4 @@
package grpc
// Version is the current grpc version.
const Version = "1.73.0"
const Version = "1.74.0-dev"

6
vendor/modules.txt vendored
View File

@@ -672,7 +672,7 @@ github.com/gogo/protobuf/protoc-gen-gogo/descriptor
# github.com/golang-jwt/jwt/v4 v4.5.2
## explicit; go 1.16
github.com/golang-jwt/jwt/v4
# github.com/golang-jwt/jwt/v5 v5.2.2
# github.com/golang-jwt/jwt/v5 v5.2.3
## explicit; go 1.18
github.com/golang-jwt/jwt/v5
# github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da
@@ -1213,7 +1213,7 @@ github.com/open-policy-agent/opa/v1/version
# github.com/opencloud-eu/libre-graph-api-go v1.0.8-0.20250707143759-32eaae12b2ce
## explicit; go 1.18
github.com/opencloud-eu/libre-graph-api-go
# github.com/opencloud-eu/reva/v2 v2.34.1-0.20250716074813-cfe225225b23
# github.com/opencloud-eu/reva/v2 v2.34.1-0.20250717130558-81c1803fb574
## explicit; go 1.24.1
github.com/opencloud-eu/reva/v2/cmd/revad/internal/grace
github.com/opencloud-eu/reva/v2/cmd/revad/runtime
@@ -2333,7 +2333,7 @@ google.golang.org/genproto/googleapis/api/httpbody
## explicit; go 1.23.0
google.golang.org/genproto/googleapis/rpc/errdetails
google.golang.org/genproto/googleapis/rpc/status
# google.golang.org/grpc v1.73.0
# google.golang.org/grpc v1.74.0
## explicit; go 1.23.0
google.golang.org/grpc
google.golang.org/grpc/attributes