concurrent autoaccept

Signed-off-by: Jörn Friedrich Dreyer <jfd@butonic.de>

Update services/frontend/pkg/config/config.go

Co-authored-by: Martin <github@diemattels.at>

align concurrency approach

Signed-off-by: Jörn Friedrich Dreyer <jfd@butonic.de>
This commit is contained in:
Jörn Friedrich Dreyer
2024-11-05 17:35:25 +01:00
parent 331e134398
commit 42711aef88
4 changed files with 68 additions and 37 deletions

View File

@@ -0,0 +1,5 @@
Enhancement: Concurrent autoaccept for shares
Shares for groups are now concurrently accepted. Tha default of 25 goroutinges can be changed with the new `FRONTEND_MAX_CONCURRENCY` environment variable.
https://github.com/owncloud/ocis/pull/10476

View File

@@ -3,7 +3,13 @@ package command
import (
"context"
"errors"
"sync"
gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1"
group "github.com/cs3org/go-cs3apis/cs3/identity/group/v1beta1"
user "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
collaboration "github.com/cs3org/go-cs3apis/cs3/sharing/collaboration/v1beta1"
"github.com/cs3org/reva/v2/pkg/events"
"github.com/cs3org/reva/v2/pkg/events/stream"
"github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool"
@@ -17,16 +23,9 @@ import (
"github.com/owncloud/ocis/v2/ocis-pkg/registry"
"github.com/owncloud/ocis/v2/ocis-pkg/service/grpc"
"github.com/owncloud/ocis/v2/ocis-pkg/tracing"
settingssvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/settings/v0"
"github.com/owncloud/ocis/v2/services/frontend/pkg/config"
"github.com/owncloud/ocis/v2/services/settings/pkg/store/defaults"
gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1"
group "github.com/cs3org/go-cs3apis/cs3/identity/group/v1beta1"
user "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
collaboration "github.com/cs3org/go-cs3apis/cs3/sharing/collaboration/v1beta1"
settingssvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/settings/v0"
)
var _registeredEvents = []events.Unmarshaller{
@@ -90,7 +89,7 @@ func ListenForEvents(ctx context.Context, cfg *config.Config, l log.Logger) erro
default:
l.Error().Interface("event", e).Msg("unhandled event")
case events.ShareCreated:
AutoAcceptShares(ev, cfg.AutoAcceptShares, l, gatewaySelector, valueService, cfg.ServiceAccount)
AutoAcceptShares(ev, cfg.AutoAcceptShares, l, gatewaySelector, valueService, cfg.ServiceAccount, cfg.MaxConcurrency)
}
case <-ctx.Done():
l.Info().Msg("context cancelled")
@@ -100,7 +99,7 @@ func ListenForEvents(ctx context.Context, cfg *config.Config, l log.Logger) erro
}
// AutoAcceptShares automatically accepts shares if configured by the admin or user
func AutoAcceptShares(ev events.ShareCreated, autoAcceptDefault bool, l log.Logger, gatewaySelector pool.Selectable[gateway.GatewayAPIClient], vs settingssvc.ValueService, cfg config.ServiceAccount) {
func AutoAcceptShares(ev events.ShareCreated, autoAcceptDefault bool, l log.Logger, gatewaySelector pool.Selectable[gateway.GatewayAPIClient], vs settingssvc.ValueService, cfg config.ServiceAccount, maxConcurrency int) {
gwc, err := gatewaySelector.Next()
if err != nil {
l.Error().Err(err).Msg("cannot get gateway client")
@@ -118,36 +117,60 @@ func AutoAcceptShares(ev events.ShareCreated, autoAcceptDefault bool, l log.Logg
return
}
for _, uid := range userIDs {
if !autoAcceptShares(ctx, uid, autoAcceptDefault, vs) {
continue
}
work := make(chan *user.UserId, len(userIDs))
gwc, err := gatewaySelector.Next()
if err != nil {
l.Error().Err(err).Msg("cannot get gateway client")
continue
}
resp, err := gwc.UpdateReceivedShare(ctx, updateShareRequest(ev.ShareID, uid))
if err != nil {
l.Error().Err(err).Msg("error sending grpc request")
continue
// Distribute work
go func() {
defer close(work)
for _, userID := range userIDs {
select {
case work <- userID:
case <-ctx.Done():
return
}
}
}()
if code := resp.GetStatus().GetCode(); code != rpc.Code_CODE_OK {
// log unexpected status codes if a share cannot be accepted...
func() *zerolog.Event {
switch code {
// ... not found is not an error in the context of auto-accepting shares
case rpc.Code_CODE_NOT_FOUND:
return l.Debug()
default:
return l.Error()
wg := sync.WaitGroup{}
for i := 0; i < maxConcurrency; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for userID := range work {
if !autoAcceptShares(ctx, userID, autoAcceptDefault, vs) {
continue
}
}().Interface("status", resp.GetStatus()).Str("userid", uid.GetOpaqueId()).Msg("unexpected status code while accepting share")
}
gwc, err := gatewaySelector.Next()
if err != nil {
l.Error().Err(err).Msg("cannot get gateway client")
continue
}
resp, err := gwc.UpdateReceivedShare(ctx, updateShareRequest(ev.ShareID, userID))
if err != nil {
l.Error().Err(err).Msg("error sending grpc request")
continue
}
if code := resp.GetStatus().GetCode(); code != rpc.Code_CODE_OK {
// log unexpected status codes if a share cannot be accepted...
func() *zerolog.Event {
switch code {
// ... not found is not an error in the context of auto-accepting shares
case rpc.Code_CODE_NOT_FOUND:
return l.Debug()
default:
return l.Error()
}
}().Interface("status", resp.GetStatus()).Str("userid", userID.GetOpaqueId()).Msg("unexpected status code while accepting share")
}
}
}()
}
// Wait for all goroutines to finish
wg.Wait()
}
func getUserIDs(ctx context.Context, gatewaySelector pool.Selectable[gateway.GatewayAPIClient], uid *user.UserId, gid *group.GroupId) ([]*user.UserId, error) {

View File

@@ -38,8 +38,8 @@ type Config struct {
DisableSSE bool `yaml:"disable_sse" env:"OCIS_DISABLE_SSE;FRONTEND_DISABLE_SSE" desc:"When set to true, clients are informed that the Server-Sent Events endpoint is not accessible." introductionVersion:"pre5.0"`
DefaultLinkPermissions int `yaml:"default_link_permissions" env:"FRONTEND_DEFAULT_LINK_PERMISSIONS" desc:"Defines the default permissions a link is being created with. Possible values are 0 (= internal link, for instance members only) and 1 (= public link with viewer permissions). Defaults to 1." introductionVersion:"5.0"`
PublicURL string `yaml:"public_url" env:"OCIS_URL;FRONTEND_PUBLIC_URL" desc:"The public facing URL of the oCIS frontend." introductionVersion:"pre5.0"`
PublicURL string `yaml:"public_url" env:"OCIS_URL;FRONTEND_PUBLIC_URL" desc:"The public facing URL of the oCIS frontend." introductionVersion:"pre5.0"`
MaxConcurrency int `yaml:"max_concurrency" env:"OCIS_MAX_CONCURRENCY;FRONTEND_MAX_CONCURRENCY" desc:"Maximum number of concurrent go-routines. Higher values can potentially get work done faster but will also cause more load on the system. Values of 0 or below will be ignored and the default value will be used." introductionVersion:"7.0.0"`
AppHandler AppHandler `yaml:"app_handler"`
Archiver Archiver `yaml:"archiver"`
DataGateway DataGateway `yaml:"data_gateway"`

View File

@@ -130,6 +130,7 @@ func DefaultConfig() *config.Config {
Cluster: "ocis-cluster",
EnableTLS: false,
},
MaxConcurrency: 25,
PasswordPolicy: config.PasswordPolicy{
MinCharacters: 8,
MinLowerCaseCharacters: 1,
@@ -195,5 +196,7 @@ func EnsureDefaults(cfg *config.Config) {
// Sanitize sanitized the configuration
func Sanitize(cfg *config.Config) {
// nothing to sanitize here atm
if cfg.MaxConcurrency <= 0 {
cfg.MaxConcurrency = 5
}
}