diff --git a/changelog/unreleased/concurrent-autoaccept.md b/changelog/unreleased/concurrent-autoaccept.md new file mode 100644 index 000000000..188f47eac --- /dev/null +++ b/changelog/unreleased/concurrent-autoaccept.md @@ -0,0 +1,5 @@ +Enhancement: Concurrent autoaccept for shares + +Shares for groups are now concurrently accepted. Tha default of 25 goroutinges can be changed with the new `FRONTEND_MAX_CONCURRENCY` environment variable. + +https://github.com/owncloud/ocis/pull/10476 diff --git a/services/frontend/pkg/command/events.go b/services/frontend/pkg/command/events.go index f80055da4..16701429b 100644 --- a/services/frontend/pkg/command/events.go +++ b/services/frontend/pkg/command/events.go @@ -3,7 +3,13 @@ package command import ( "context" "errors" + "sync" + gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1" + group "github.com/cs3org/go-cs3apis/cs3/identity/group/v1beta1" + user "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" + rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1" + collaboration "github.com/cs3org/go-cs3apis/cs3/sharing/collaboration/v1beta1" "github.com/cs3org/reva/v2/pkg/events" "github.com/cs3org/reva/v2/pkg/events/stream" "github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool" @@ -17,16 +23,9 @@ import ( "github.com/owncloud/ocis/v2/ocis-pkg/registry" "github.com/owncloud/ocis/v2/ocis-pkg/service/grpc" "github.com/owncloud/ocis/v2/ocis-pkg/tracing" + settingssvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/settings/v0" "github.com/owncloud/ocis/v2/services/frontend/pkg/config" "github.com/owncloud/ocis/v2/services/settings/pkg/store/defaults" - - gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1" - group "github.com/cs3org/go-cs3apis/cs3/identity/group/v1beta1" - user "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" - rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1" - collaboration "github.com/cs3org/go-cs3apis/cs3/sharing/collaboration/v1beta1" - - settingssvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/settings/v0" ) var _registeredEvents = []events.Unmarshaller{ @@ -90,7 +89,7 @@ func ListenForEvents(ctx context.Context, cfg *config.Config, l log.Logger) erro default: l.Error().Interface("event", e).Msg("unhandled event") case events.ShareCreated: - AutoAcceptShares(ev, cfg.AutoAcceptShares, l, gatewaySelector, valueService, cfg.ServiceAccount) + AutoAcceptShares(ev, cfg.AutoAcceptShares, l, gatewaySelector, valueService, cfg.ServiceAccount, cfg.MaxConcurrency) } case <-ctx.Done(): l.Info().Msg("context cancelled") @@ -100,7 +99,7 @@ func ListenForEvents(ctx context.Context, cfg *config.Config, l log.Logger) erro } // AutoAcceptShares automatically accepts shares if configured by the admin or user -func AutoAcceptShares(ev events.ShareCreated, autoAcceptDefault bool, l log.Logger, gatewaySelector pool.Selectable[gateway.GatewayAPIClient], vs settingssvc.ValueService, cfg config.ServiceAccount) { +func AutoAcceptShares(ev events.ShareCreated, autoAcceptDefault bool, l log.Logger, gatewaySelector pool.Selectable[gateway.GatewayAPIClient], vs settingssvc.ValueService, cfg config.ServiceAccount, maxConcurrency int) { gwc, err := gatewaySelector.Next() if err != nil { l.Error().Err(err).Msg("cannot get gateway client") @@ -118,36 +117,60 @@ func AutoAcceptShares(ev events.ShareCreated, autoAcceptDefault bool, l log.Logg return } - for _, uid := range userIDs { - if !autoAcceptShares(ctx, uid, autoAcceptDefault, vs) { - continue - } + work := make(chan *user.UserId, len(userIDs)) - gwc, err := gatewaySelector.Next() - if err != nil { - l.Error().Err(err).Msg("cannot get gateway client") - continue - } - resp, err := gwc.UpdateReceivedShare(ctx, updateShareRequest(ev.ShareID, uid)) - if err != nil { - l.Error().Err(err).Msg("error sending grpc request") - continue + // Distribute work + go func() { + defer close(work) + for _, userID := range userIDs { + select { + case work <- userID: + case <-ctx.Done(): + return + } } + }() - if code := resp.GetStatus().GetCode(); code != rpc.Code_CODE_OK { - // log unexpected status codes if a share cannot be accepted... - func() *zerolog.Event { - switch code { - // ... not found is not an error in the context of auto-accepting shares - case rpc.Code_CODE_NOT_FOUND: - return l.Debug() - default: - return l.Error() + wg := sync.WaitGroup{} + for i := 0; i < maxConcurrency; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for userID := range work { + + if !autoAcceptShares(ctx, userID, autoAcceptDefault, vs) { + continue } - }().Interface("status", resp.GetStatus()).Str("userid", uid.GetOpaqueId()).Msg("unexpected status code while accepting share") - } + + gwc, err := gatewaySelector.Next() + if err != nil { + l.Error().Err(err).Msg("cannot get gateway client") + continue + } + resp, err := gwc.UpdateReceivedShare(ctx, updateShareRequest(ev.ShareID, userID)) + if err != nil { + l.Error().Err(err).Msg("error sending grpc request") + continue + } + + if code := resp.GetStatus().GetCode(); code != rpc.Code_CODE_OK { + // log unexpected status codes if a share cannot be accepted... + func() *zerolog.Event { + switch code { + // ... not found is not an error in the context of auto-accepting shares + case rpc.Code_CODE_NOT_FOUND: + return l.Debug() + default: + return l.Error() + } + }().Interface("status", resp.GetStatus()).Str("userid", userID.GetOpaqueId()).Msg("unexpected status code while accepting share") + } + } + }() } + // Wait for all goroutines to finish + wg.Wait() } func getUserIDs(ctx context.Context, gatewaySelector pool.Selectable[gateway.GatewayAPIClient], uid *user.UserId, gid *group.GroupId) ([]*user.UserId, error) { diff --git a/services/frontend/pkg/config/config.go b/services/frontend/pkg/config/config.go index d4e2b51d5..3851927ca 100644 --- a/services/frontend/pkg/config/config.go +++ b/services/frontend/pkg/config/config.go @@ -38,8 +38,8 @@ type Config struct { DisableSSE bool `yaml:"disable_sse" env:"OCIS_DISABLE_SSE;FRONTEND_DISABLE_SSE" desc:"When set to true, clients are informed that the Server-Sent Events endpoint is not accessible." introductionVersion:"pre5.0"` DefaultLinkPermissions int `yaml:"default_link_permissions" env:"FRONTEND_DEFAULT_LINK_PERMISSIONS" desc:"Defines the default permissions a link is being created with. Possible values are 0 (= internal link, for instance members only) and 1 (= public link with viewer permissions). Defaults to 1." introductionVersion:"5.0"` - PublicURL string `yaml:"public_url" env:"OCIS_URL;FRONTEND_PUBLIC_URL" desc:"The public facing URL of the oCIS frontend." introductionVersion:"pre5.0"` - + PublicURL string `yaml:"public_url" env:"OCIS_URL;FRONTEND_PUBLIC_URL" desc:"The public facing URL of the oCIS frontend." introductionVersion:"pre5.0"` + MaxConcurrency int `yaml:"max_concurrency" env:"OCIS_MAX_CONCURRENCY;FRONTEND_MAX_CONCURRENCY" desc:"Maximum number of concurrent go-routines. Higher values can potentially get work done faster but will also cause more load on the system. Values of 0 or below will be ignored and the default value will be used." introductionVersion:"7.0.0"` AppHandler AppHandler `yaml:"app_handler"` Archiver Archiver `yaml:"archiver"` DataGateway DataGateway `yaml:"data_gateway"` diff --git a/services/frontend/pkg/config/defaults/defaultconfig.go b/services/frontend/pkg/config/defaults/defaultconfig.go index a448f8acd..23c91c1e9 100644 --- a/services/frontend/pkg/config/defaults/defaultconfig.go +++ b/services/frontend/pkg/config/defaults/defaultconfig.go @@ -130,6 +130,7 @@ func DefaultConfig() *config.Config { Cluster: "ocis-cluster", EnableTLS: false, }, + MaxConcurrency: 25, PasswordPolicy: config.PasswordPolicy{ MinCharacters: 8, MinLowerCaseCharacters: 1, @@ -195,5 +196,7 @@ func EnsureDefaults(cfg *config.Config) { // Sanitize sanitized the configuration func Sanitize(cfg *config.Config) { - // nothing to sanitize here atm + if cfg.MaxConcurrency <= 0 { + cfg.MaxConcurrency = 5 + } }