mirror of
https://github.com/opencloud-eu/opencloud.git
synced 2026-03-13 18:00:34 -05:00
fix(thumbnails): Implement ratelimit for the grpc service
This moves the ratelimit ('THUMBNAILS_MAX_CONCURRENT_REQUESTS') from the
HTTP endpoint to the GRPC endpoint. The HTTP endpoint is just used for
downloading already created thumbnails. But the resource consuming part of
thumbnail generation happens in the GRPC service.
This commit is contained in:
committed by
Ralf Haferkamp
parent
9e978e290a
commit
e2d7251893
7
changelog/unreleased/fix-thumbnail-ratelimit.md
Normal file
7
changelog/unreleased/fix-thumbnail-ratelimit.md
Normal file
@@ -0,0 +1,7 @@
|
||||
Bugfix: Thumbnail request limit
|
||||
|
||||
The `THUMBNAILS_MAX_CONCURRENT_REQUESTS` setting was not working correctly.
|
||||
Previously it was just limiting the number of concurrent thumbnail downloads.
|
||||
Now the limit is applied to the number thumbnail generations requests.
|
||||
|
||||
https://github.com/owncloud/ocis/pull/10225
|
||||
39
ocis-pkg/service/grpc/handler/ratelimiter/ratelimiter.go
Normal file
39
ocis-pkg/service/grpc/handler/ratelimiter/ratelimiter.go
Normal file
@@ -0,0 +1,39 @@
|
||||
package ratelimiter
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"go-micro.dev/v4/errors"
|
||||
"go-micro.dev/v4/server"
|
||||
)
|
||||
|
||||
// NewHandlerWrapper creates a blocking server side rate limiter.
|
||||
func NewHandlerWrapper(limit int) server.HandlerWrapper {
|
||||
if limit <= 0 {
|
||||
return func(h server.HandlerFunc) server.HandlerFunc {
|
||||
return h
|
||||
}
|
||||
}
|
||||
|
||||
token := make(chan struct{}, limit)
|
||||
for i := 0; i < limit; i++ {
|
||||
token <- struct{}{}
|
||||
}
|
||||
|
||||
return func(h server.HandlerFunc) server.HandlerFunc {
|
||||
return func(ctx context.Context, req server.Request, rsp interface{}) error {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case t := <-token:
|
||||
defer func() {
|
||||
token <- t
|
||||
}()
|
||||
return h(ctx, req, rsp)
|
||||
default:
|
||||
return errors.New("go.micro.server", "Rate limit exceeded", 429)
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
|
||||
"github.com/owncloud/ocis/v2/ocis-pkg/log"
|
||||
"go-micro.dev/v4/server"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
)
|
||||
|
||||
@@ -12,16 +13,17 @@ type Option func(o *Options)
|
||||
|
||||
// Options defines the available options for this package.
|
||||
type Options struct {
|
||||
Logger log.Logger
|
||||
Namespace string
|
||||
Name string
|
||||
Version string
|
||||
Address string
|
||||
TLSEnabled bool
|
||||
TLSCert string
|
||||
TLSKey string
|
||||
Context context.Context
|
||||
TraceProvider trace.TracerProvider
|
||||
Logger log.Logger
|
||||
Namespace string
|
||||
Name string
|
||||
Version string
|
||||
Address string
|
||||
TLSEnabled bool
|
||||
TLSCert string
|
||||
TLSKey string
|
||||
Context context.Context
|
||||
TraceProvider trace.TracerProvider
|
||||
HandlerWrappers []server.HandlerWrapper
|
||||
}
|
||||
|
||||
// newOptions initializes the available default options.
|
||||
@@ -100,3 +102,9 @@ func TraceProvider(tp trace.TracerProvider) Option {
|
||||
o.TraceProvider = tp
|
||||
}
|
||||
}
|
||||
|
||||
func HandlerWrappers(w ...server.HandlerWrapper) Option {
|
||||
return func(o *Options) {
|
||||
o.HandlerWrappers = w
|
||||
}
|
||||
}
|
||||
|
||||
@@ -73,6 +73,7 @@ func NewServiceWithClient(client client.Client, opts ...Option) (Service, error)
|
||||
micro.WrapHandler(mtracer.NewHandlerWrapper(
|
||||
mtracer.WithTraceProvider(sopts.TraceProvider),
|
||||
)),
|
||||
micro.WrapHandler(sopts.HandlerWrappers...),
|
||||
micro.WrapSubscriber(mtracer.NewSubscriberWrapper(
|
||||
mtracer.WithTraceProvider(sopts.TraceProvider),
|
||||
)),
|
||||
|
||||
@@ -59,6 +59,7 @@ func Server(cfg *config.Config) *cli.Command {
|
||||
grpc.Address(cfg.GRPC.Addr),
|
||||
grpc.Metrics(m),
|
||||
grpc.TraceProvider(traceProvider),
|
||||
grpc.MaxConcurrentRequests(cfg.GRPC.MaxConcurrentRequests),
|
||||
)
|
||||
|
||||
gr.Add(service.Run, func(_ error) {
|
||||
@@ -91,7 +92,6 @@ func Server(cfg *config.Config) *cli.Command {
|
||||
http.Metrics(m),
|
||||
http.Namespace(cfg.HTTP.Namespace),
|
||||
http.TraceProvider(traceProvider),
|
||||
http.MaxConcurrentRequests(cfg.HTTP.MaxConcurrentRequests),
|
||||
)
|
||||
if err != nil {
|
||||
logger.Info().
|
||||
|
||||
@@ -28,14 +28,14 @@ func DefaultConfig() *config.Config {
|
||||
Zpages: false,
|
||||
},
|
||||
GRPC: config.GRPCConfig{
|
||||
Addr: "127.0.0.1:9185",
|
||||
Namespace: "com.owncloud.api",
|
||||
Addr: "127.0.0.1:9185",
|
||||
Namespace: "com.owncloud.api",
|
||||
MaxConcurrentRequests: 0,
|
||||
},
|
||||
HTTP: config.HTTP{
|
||||
Addr: "127.0.0.1:9186",
|
||||
Root: "/thumbnails",
|
||||
Namespace: "com.owncloud.web",
|
||||
MaxConcurrentRequests: 0,
|
||||
Addr: "127.0.0.1:9186",
|
||||
Root: "/thumbnails",
|
||||
Namespace: "com.owncloud.web",
|
||||
CORS: config.CORS{
|
||||
AllowedOrigins: []string{"*"},
|
||||
AllowedMethods: []string{"GET", "POST", "PUT", "PATCH", "DELETE", "OPTIONS"},
|
||||
|
||||
@@ -4,7 +4,8 @@ import "github.com/owncloud/ocis/v2/ocis-pkg/shared"
|
||||
|
||||
// GRPCConfig defines the available grpc configuration.
|
||||
type GRPCConfig struct {
|
||||
Addr string `yaml:"addr" env:"THUMBNAILS_GRPC_ADDR" desc:"The bind address of the GRPC service." introductionVersion:"pre5.0"`
|
||||
Namespace string `yaml:"-"`
|
||||
TLS *shared.GRPCServiceTLS `yaml:"tls"`
|
||||
Addr string `yaml:"addr" env:"THUMBNAILS_GRPC_ADDR" desc:"The bind address of the GRPC service." introductionVersion:"pre5.0"`
|
||||
Namespace string `yaml:"-"`
|
||||
TLS *shared.GRPCServiceTLS `yaml:"tls"`
|
||||
MaxConcurrentRequests int `yaml:"max_concurrent_requests" env:"THUMBNAILS_MAX_CONCURRENT_REQUESTS" desc:"Number of maximum concurrent thumbnail requests. Default is 0 which is unlimited." introductionVersion:"6.0.0"`
|
||||
}
|
||||
|
||||
@@ -12,10 +12,9 @@ type CORS struct {
|
||||
|
||||
// HTTP defines the available http configuration.
|
||||
type HTTP struct {
|
||||
Addr string `yaml:"addr" env:"THUMBNAILS_HTTP_ADDR" desc:"The bind address of the HTTP service." introductionVersion:"pre5.0"`
|
||||
TLS shared.HTTPServiceTLS `yaml:"tls"`
|
||||
Root string `yaml:"root" env:"THUMBNAILS_HTTP_ROOT" desc:"Subdirectory that serves as the root for this HTTP service." introductionVersion:"pre5.0"`
|
||||
Namespace string `yaml:"-"`
|
||||
CORS CORS `yaml:"cors"`
|
||||
MaxConcurrentRequests int `yaml:"max_concurrent_requests" env:"THUMBNAILS_MAX_CONCURRENT_REQUESTS" desc:"Number of maximum concurrent thumbnail requests. Default is 0 which is unlimited." introductionVersion:"6.0.0"`
|
||||
Addr string `yaml:"addr" env:"THUMBNAILS_HTTP_ADDR" desc:"The bind address of the HTTP service." introductionVersion:"pre5.0"`
|
||||
TLS shared.HTTPServiceTLS `yaml:"tls"`
|
||||
Root string `yaml:"root" env:"THUMBNAILS_HTTP_ROOT" desc:"Subdirectory that serves as the root for this HTTP service." introductionVersion:"pre5.0"`
|
||||
Namespace string `yaml:"-"`
|
||||
CORS CORS `yaml:"cors"`
|
||||
}
|
||||
|
||||
@@ -14,14 +14,15 @@ type Option func(o *Options)
|
||||
|
||||
// Options defines the available options for this package.
|
||||
type Options struct {
|
||||
Name string
|
||||
Address string
|
||||
Logger log.Logger
|
||||
Context context.Context
|
||||
Config *config.Config
|
||||
Metrics *metrics.Metrics
|
||||
Namespace string
|
||||
TraceProvider trace.TracerProvider
|
||||
Name string
|
||||
Address string
|
||||
Logger log.Logger
|
||||
Context context.Context
|
||||
Config *config.Config
|
||||
Metrics *metrics.Metrics
|
||||
Namespace string
|
||||
TraceProvider trace.TracerProvider
|
||||
MaxConcurrentRequests int
|
||||
}
|
||||
|
||||
// newOptions initializes the available default options.
|
||||
@@ -90,3 +91,10 @@ func TraceProvider(val trace.TracerProvider) Option {
|
||||
o.TraceProvider = val
|
||||
}
|
||||
}
|
||||
|
||||
// MaxConcurrentRequests provides a function to set the MaxConcurrentRequests option.
|
||||
func MaxConcurrentRequests(val int) Option {
|
||||
return func(o *Options) {
|
||||
o.MaxConcurrentRequests = val
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
"github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool"
|
||||
"github.com/owncloud/ocis/v2/ocis-pkg/registry"
|
||||
"github.com/owncloud/ocis/v2/ocis-pkg/service/grpc"
|
||||
"github.com/owncloud/ocis/v2/ocis-pkg/service/grpc/handler/ratelimiter"
|
||||
"github.com/owncloud/ocis/v2/ocis-pkg/version"
|
||||
thumbnailssvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/thumbnails/v0"
|
||||
svc "github.com/owncloud/ocis/v2/services/thumbnails/pkg/service/grpc/v0"
|
||||
@@ -32,6 +33,7 @@ func NewService(opts ...Option) grpc.Service {
|
||||
grpc.Context(options.Context),
|
||||
grpc.Version(version.GetString()),
|
||||
grpc.TraceProvider(options.TraceProvider),
|
||||
grpc.HandlerWrappers(ratelimiter.NewHandlerWrapper(options.MaxConcurrentRequests)),
|
||||
)
|
||||
if err != nil {
|
||||
options.Logger.Fatal().Err(err).Msg("Error creating thumbnail service")
|
||||
|
||||
@@ -16,14 +16,13 @@ type Option func(o *Options)
|
||||
|
||||
// Options defines the available options for this package.
|
||||
type Options struct {
|
||||
Namespace string
|
||||
Logger log.Logger
|
||||
Context context.Context
|
||||
Config *config.Config
|
||||
Metrics *metrics.Metrics
|
||||
Flags []cli.Flag
|
||||
TraceProvider trace.TracerProvider
|
||||
MaxConcurrentRequests int
|
||||
Namespace string
|
||||
Logger log.Logger
|
||||
Context context.Context
|
||||
Config *config.Config
|
||||
Metrics *metrics.Metrics
|
||||
Flags []cli.Flag
|
||||
TraceProvider trace.TracerProvider
|
||||
}
|
||||
|
||||
// newOptions initializes the available default options.
|
||||
@@ -82,10 +81,3 @@ func TraceProvider(traceProvider trace.TracerProvider) Option {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// MaxConcurrentRequests provides a function to set the MaxConcurrentRequests option.
|
||||
func MaxConcurrentRequests(val int) Option {
|
||||
return func(o *Options) {
|
||||
o.MaxConcurrentRequests = val
|
||||
}
|
||||
}
|
||||
|
||||
@@ -40,7 +40,6 @@ func Server(opts ...Option) (http.Service, error) {
|
||||
svc.Middleware(
|
||||
middleware.RealIP,
|
||||
middleware.RequestID,
|
||||
ocismiddleware.Throttle(options.MaxConcurrentRequests),
|
||||
ocismiddleware.Cors(
|
||||
cors.Logger(options.Logger),
|
||||
cors.AllowedOrigins(options.Config.HTTP.CORS.AllowedOrigins),
|
||||
|
||||
@@ -258,6 +258,8 @@ func (g Webdav) SpacesThumbnail(w http.ResponseWriter, r *http.Request) {
|
||||
// StatusTooEarly if file is processing
|
||||
renderError(w, r, errTooEarly(e.Detail))
|
||||
return
|
||||
case http.StatusTooManyRequests:
|
||||
renderError(w, r, errTooManyRequests(e.Detail))
|
||||
case http.StatusBadRequest:
|
||||
renderError(w, r, errBadRequest(e.Detail))
|
||||
case http.StatusForbidden:
|
||||
@@ -546,6 +548,10 @@ func errTooEarly(msg string) *errResponse {
|
||||
return newErrResponse(http.StatusTooEarly, msg)
|
||||
}
|
||||
|
||||
func errTooManyRequests(msg string) *errResponse {
|
||||
return newErrResponse(http.StatusTooManyRequests, msg)
|
||||
}
|
||||
|
||||
func renderError(w http.ResponseWriter, r *http.Request, err *errResponse) {
|
||||
render.Status(r, err.HTTPStatusCode)
|
||||
render.XML(w, r, err)
|
||||
|
||||
Reference in New Issue
Block a user