From 9fea5c3002038367810052ff9b85f3368167feeb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn=20Friedrich=20Dreyer?= Date: Thu, 28 Nov 2024 11:53:07 +0100 Subject: [PATCH 1/2] graph concurrent share listing MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jörn Friedrich Dreyer --- .../graph-concurrent-share-listing.md | 5 + services/graph/pkg/config/config.go | 1 + .../pkg/config/defaults/defaultconfig.go | 1 + .../service/v0/api_driveitem_permissions.go | 9 +- .../v0/api_driveitem_permissions_test.go | 7 +- services/graph/pkg/service/v0/base.go | 357 ++++++++++++++---- 6 files changed, 293 insertions(+), 87 deletions(-) create mode 100644 changelog/unreleased/graph-concurrent-share-listing.md diff --git a/changelog/unreleased/graph-concurrent-share-listing.md b/changelog/unreleased/graph-concurrent-share-listing.md new file mode 100644 index 0000000000..9d60d47d91 --- /dev/null +++ b/changelog/unreleased/graph-concurrent-share-listing.md @@ -0,0 +1,5 @@ +Enhancement: fetch shared resource metadata concurrently in graph + +We now concurrently stat shared resources when listing shares + +https://github.com/owncloud/ocis/pull/10683 diff --git a/services/graph/pkg/config/config.go b/services/graph/pkg/config/config.go index 01e37f01b6..daa1071d28 100644 --- a/services/graph/pkg/config/config.go +++ b/services/graph/pkg/config/config.go @@ -31,6 +31,7 @@ type Config struct { IncludeOCMSharees bool `yaml:"include_ocm_sharees" env:"OCIS_ENABLE_OCM;GRAPH_INCLUDE_OCM_SHAREES" desc:"Include OCM sharees when listing users." introductionVersion:"5.0"` Events Events `yaml:"events"` UnifiedRoles UnifiedRoles `yaml:"unified_roles"` + MaxConcurrency int `yaml:"max_concurrency" env:"OCIS_MAX_CONCURRENCY;GRAPH_MAX_CONCURRENCY" desc:"The maximum number of concurrent requests the service will handle." introductionVersion:"7.0"` Keycloak Keycloak `yaml:"keycloak"` ServiceAccount ServiceAccount `yaml:"service_account"` diff --git a/services/graph/pkg/config/defaults/defaultconfig.go b/services/graph/pkg/config/defaults/defaultconfig.go index 8bcb692133..023dc1a65e 100644 --- a/services/graph/pkg/config/defaults/defaultconfig.go +++ b/services/graph/pkg/config/defaults/defaultconfig.go @@ -121,6 +121,7 @@ func DefaultConfig() *config.Config { Cluster: "ocis-cluster", EnableTLS: false, }, + MaxConcurrency: 20, UnifiedRoles: config.UnifiedRoles{ AvailableRoles: nil, // will be populated with defaults in EnsureDefaults }, diff --git a/services/graph/pkg/service/v0/api_driveitem_permissions.go b/services/graph/pkg/service/v0/api_driveitem_permissions.go index dfdb3567a1..eda3999dfb 100644 --- a/services/graph/pkg/service/v0/api_driveitem_permissions.go +++ b/services/graph/pkg/service/v0/api_driveitem_permissions.go @@ -378,7 +378,14 @@ func (s DriveItemPermissionsService) ListPermissions(ctx context.Context, itemID return collectionOfPermissions, nil } - driveItems := make(driveItemsByResourceID) + driveItems := make(driveItemsByResourceID, 1) + // we can use the statResponse to build the drive item before fetching the shares + item, err := cs3ResourceToDriveItem(s.logger, statResponse.GetInfo()) + if err != nil { + return collectionOfPermissions, err + } + driveItems[storagespace.FormatResourceID(statResponse.GetInfo().GetId())] = *item + if IsSpaceRoot(statResponse.GetInfo().GetId()) { permissions, err := s.getSpaceRootPermissions(ctx, statResponse.GetInfo().GetSpace().GetId()) if err != nil { diff --git a/services/graph/pkg/service/v0/api_driveitem_permissions_test.go b/services/graph/pkg/service/v0/api_driveitem_permissions_test.go index d08ea120bc..2761a23a5b 100644 --- a/services/graph/pkg/service/v0/api_driveitem_permissions_test.go +++ b/services/graph/pkg/service/v0/api_driveitem_permissions_test.go @@ -419,10 +419,11 @@ var _ = Describe("DriveItemPermissionsService", func() { OpaqueId: "public-share-id", }, Token: "public-share-token", + // the link shares the same resource id ResourceId: &provider.ResourceId{ - StorageId: "storageid", - SpaceId: "spaceid", - OpaqueId: "public-share-opaqueid", + StorageId: "1", + SpaceId: "2", + OpaqueId: "3", }, Permissions: &link.PublicSharePermissions{Permissions: roleconversions.NewViewerRole().CS3ResourcePermissions()}, }, diff --git a/services/graph/pkg/service/v0/base.go b/services/graph/pkg/service/v0/base.go index d670aff547..10b8488113 100644 --- a/services/graph/pkg/service/v0/base.go +++ b/services/graph/pkg/service/v0/base.go @@ -17,6 +17,7 @@ import ( storageprovider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" types "github.com/cs3org/go-cs3apis/cs3/types/v1beta1" libregraph "github.com/owncloud/libre-graph-api-go" + "golang.org/x/sync/errgroup" "google.golang.org/protobuf/types/known/fieldmaskpb" "github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool" @@ -350,77 +351,204 @@ func (g BaseGraphService) listPublicShares(ctx context.Context, filters []*link. } func (g BaseGraphService) cs3UserSharesToDriveItems(ctx context.Context, shares []*collaboration.Share, driveItems driveItemsByResourceID) (driveItemsByResourceID, error) { - for _, s := range shares { - g.logger.Debug().Interface("CS3 UserShare", s).Msg("Got Share") - resIDStr := storagespace.FormatResourceID(s.ResourceId) - item, ok := driveItems[resIDStr] - if !ok { - itemptr, err := g.getDriveItem(ctx, &storageprovider.Reference{ResourceId: s.ResourceId}) - if err != nil { - g.logger.Debug().Err(err).Interface("Share", s.ResourceId).Msg("could not stat share, skipping") - continue - } - item = *itemptr - } + errg, ctx := errgroup.WithContext(ctx) - var condition string - switch { - case item.Root != nil: - condition = unifiedrole.UnifiedRoleConditionDrive - case item.Folder != nil: - condition = unifiedrole.UnifiedRoleConditionFolder - case item.File != nil: - condition = unifiedrole.UnifiedRoleConditionFile - } - perm, err := g.cs3UserShareToPermission(ctx, s, condition) - - var errcode errorcode.Error - switch { - case errors.As(err, &errcode) && errcode.GetCode() == errorcode.ItemNotFound: - // The Grantee couldn't be found (user/group does not exist anymore) - continue - case err != nil: - return driveItems, err - } - item.Permissions = append(item.Permissions, *perm) - driveItems[resIDStr] = item + // group shares by resource id + sharesByResource := make(map[string][]*collaboration.Share) + for _, share := range shares { + sharesByResource[share.GetResourceId().String()] = append(sharesByResource[share.GetResourceId().String()], share) } + + type resourceShares struct { + ResourceID *storageprovider.ResourceId + Shares []*collaboration.Share + } + + work := make(chan resourceShares, len(shares)) + results := make(chan *libregraph.DriveItem, len(shares)) + + // Distribute work + errg.Go(func() error { + defer close(work) + + for _, shares := range sharesByResource { + select { + case work <- resourceShares{ResourceID: shares[0].GetResourceId(), Shares: shares}: + case <-ctx.Done(): + return ctx.Err() + } + } + return nil + }) + + // Spawn workers that'll concurrently work the queue + numWorkers := g.config.MaxConcurrency + if len(sharesByResource) < numWorkers { + numWorkers = len(sharesByResource) + } + for i := 0; i < numWorkers; i++ { + errg.Go(func() error { + for sharesByResource := range work { + resIDStr := storagespace.FormatResourceID(sharesByResource.ResourceID) + // check if we already have the drive item in the map + item, ok := driveItems[resIDStr] + if !ok { + itemptr, err := g.getDriveItem(ctx, &storageprovider.Reference{ResourceId: sharesByResource.ResourceID}) + if err != nil { + g.logger.Debug().Err(err).Str("storage", sharesByResource.ResourceID.StorageId).Str("space", sharesByResource.ResourceID.SpaceId).Str("node", sharesByResource.ResourceID.OpaqueId).Msg("could not stat resource, skipping") + continue + } + item = *itemptr + } + + var condition string + switch { + case item.Root != nil: + condition = unifiedrole.UnifiedRoleConditionDrive + case item.Folder != nil: + condition = unifiedrole.UnifiedRoleConditionFolder + case item.File != nil: + condition = unifiedrole.UnifiedRoleConditionFile + } + for _, share := range sharesByResource.Shares { + perm, err := g.cs3UserShareToPermission(ctx, share, condition) + var errcode errorcode.Error + switch { + case errors.As(err, &errcode) && errcode.GetCode() == errorcode.ItemNotFound: + // The Grantee couldn't be found (user/group does not exist anymore) + continue + case err != nil: + return err + } + item.Permissions = append(item.Permissions, *perm) + } + if len(item.Permissions) == 0 { + continue + } + + select { + case results <- &item: + case <-ctx.Done(): + return ctx.Err() + } + } + return nil + }) + } + // Wait for things to settle down, then close results chan + go func() { + _ = errg.Wait() // error is checked later + close(results) + }() + + for item := range results { + driveItems[item.GetId()] = *item + } + + if err := errg.Wait(); err != nil { + return nil, err + } + return driveItems, nil } + func (g BaseGraphService) cs3OCMSharesToDriveItems(ctx context.Context, shares []*ocm.Share, driveItems driveItemsByResourceID) (driveItemsByResourceID, error) { - for _, s := range shares { - g.logger.Debug().Interface("CS3 OCMShare", s).Msg("Got Share") - resIDStr := storagespace.FormatResourceID(s.ResourceId) - item, ok := driveItems[resIDStr] - if !ok { - itemptr, err := g.getDriveItem(ctx, &storageprovider.Reference{ResourceId: s.ResourceId}) - if err != nil { - g.logger.Debug().Err(err).Interface("Share", s.ResourceId).Msg("could not stat ocm share, skipping") - continue - } - item = *itemptr - } + errg, ctx := errgroup.WithContext(ctx) - var condition string - switch { - case item.Folder != nil: - condition = unifiedrole.UnifiedRoleConditionFolderFederatedUser - case item.File != nil: - condition = unifiedrole.UnifiedRoleConditionFileFederatedUser - } - perm, err := g.cs3OCMShareToPermission(ctx, s, condition) - - var errcode errorcode.Error - switch { - case errors.As(err, &errcode) && errcode.GetCode() == errorcode.ItemNotFound: - // The Grantee couldn't be found (user/group does not exist anymore) - continue - case err != nil: - return driveItems, err - } - item.Permissions = append(item.Permissions, *perm) - driveItems[resIDStr] = item + // group shares by resource id + sharesByResource := make(map[string][]*ocm.Share) + for _, share := range shares { + sharesByResource[share.GetResourceId().String()] = append(sharesByResource[share.GetResourceId().String()], share) } + + type resourceShares struct { + ResourceID *storageprovider.ResourceId + Shares []*ocm.Share + } + + work := make(chan resourceShares, len(shares)) + results := make(chan *libregraph.DriveItem, len(shares)) + + // Distribute work + errg.Go(func() error { + defer close(work) + + for _, shares := range sharesByResource { + select { + case work <- resourceShares{ResourceID: shares[0].GetResourceId(), Shares: shares}: + case <-ctx.Done(): + return ctx.Err() + } + } + return nil + }) + + // Spawn workers that'll concurrently work the queue + numWorkers := g.config.MaxConcurrency + if len(sharesByResource) < numWorkers { + numWorkers = len(sharesByResource) + } + for i := 0; i < numWorkers; i++ { + errg.Go(func() error { + for sharesByResource := range work { + resIDStr := storagespace.FormatResourceID(sharesByResource.ResourceID) + item, ok := driveItems[resIDStr] + if !ok { + itemptr, err := g.getDriveItem(ctx, &storageprovider.Reference{ResourceId: sharesByResource.ResourceID}) + if err != nil { + g.logger.Debug().Err(err).Interface("Share", sharesByResource.ResourceID).Msg("could not stat ocm share, skipping") + continue + } + item = *itemptr + } + + var condition string + switch { + case item.Folder != nil: + condition = unifiedrole.UnifiedRoleConditionFolderFederatedUser + case item.File != nil: + condition = unifiedrole.UnifiedRoleConditionFileFederatedUser + } + for _, share := range sharesByResource.Shares { + perm, err := g.cs3OCMShareToPermission(ctx, share, condition) + + var errcode errorcode.Error + switch { + case errors.As(err, &errcode) && errcode.GetCode() == errorcode.ItemNotFound: + // The Grantee couldn't be found (user/group does not exist anymore) + continue + case err != nil: + return err + } + item.Permissions = append(item.Permissions, *perm) + } + if len(item.Permissions) == 0 { + continue + } + + select { + case results <- &item: + case <-ctx.Done(): + return ctx.Err() + } + } + return nil + }) + } + // Wait for things to settle down, then close results chan + go func() { + _ = errg.Wait() // error is checked later + close(results) + }() + + for item := range results { + driveItems[item.GetId()] = *item + } + + if err := errg.Wait(); err != nil { + return nil, err + } + return driveItems, nil } @@ -593,26 +721,89 @@ func (g BaseGraphService) cs3OCMShareToPermission(ctx context.Context, share *oc } func (g BaseGraphService) cs3PublicSharesToDriveItems(ctx context.Context, shares []*link.PublicShare, driveItems driveItemsByResourceID) (driveItemsByResourceID, error) { - for _, s := range shares { - g.logger.Debug().Interface("CS3 PublicShare", s).Msg("Got Share") - resIDStr := storagespace.FormatResourceID(s.ResourceId) - item, ok := driveItems[resIDStr] - if !ok { - itemptr, err := g.getDriveItem(ctx, &storageprovider.Reference{ResourceId: s.ResourceId}) - if err != nil { - g.logger.Debug().Err(err).Interface("Share", s.ResourceId).Msg("could not stat share, skipping") - continue - } - item = *itemptr - } - perm, err := g.libreGraphPermissionFromCS3PublicShare(s) - if err != nil { - g.logger.Error().Err(err).Interface("Link", s.ResourceId).Msg("could not convert link to libregraph") - return driveItems, err - } + errg, ctx := errgroup.WithContext(ctx) - item.Permissions = append(item.Permissions, *perm) - driveItems[resIDStr] = item + // group shares by resource id + sharesByResource := make(map[string][]*link.PublicShare) + for _, share := range shares { + sharesByResource[share.GetResourceId().String()] = append(sharesByResource[share.GetResourceId().String()], share) + } + + type resourceShares struct { + ResourceID *storageprovider.ResourceId + Shares []*link.PublicShare + } + + work := make(chan resourceShares, len(shares)) + results := make(chan *libregraph.DriveItem, len(shares)) + + // Distribute work + errg.Go(func() error { + defer close(work) + + for _, shares := range sharesByResource { + select { + case work <- resourceShares{ResourceID: shares[0].GetResourceId(), Shares: shares}: + case <-ctx.Done(): + return ctx.Err() + } + } + return nil + }) + + // Spawn workers that'll concurrently work the queue + numWorkers := g.config.MaxConcurrency + if len(sharesByResource) < numWorkers { + numWorkers = len(sharesByResource) + } + for i := 0; i < numWorkers; i++ { + errg.Go(func() error { + for sharesByResource := range work { + resIDStr := storagespace.FormatResourceID(sharesByResource.ResourceID) + item, ok := driveItems[resIDStr] + if !ok { + itemptr, err := g.getDriveItem(ctx, &storageprovider.Reference{ResourceId: sharesByResource.ResourceID}) + if err != nil { + g.logger.Debug().Err(err).Interface("Share", sharesByResource.ResourceID).Msg("could not stat share, skipping") + continue + } + item = *itemptr + } + for _, share := range sharesByResource.Shares { + + perm, err := g.libreGraphPermissionFromCS3PublicShare(share) + if err != nil { + g.logger.Error().Err(err).Interface("Link", sharesByResource.ResourceID).Msg("could not convert link to libregraph") + return err + } + + item.Permissions = append(item.Permissions, *perm) + } + if len(item.Permissions) == 0 { + continue + } + + select { + case results <- &item: + case <-ctx.Done(): + return ctx.Err() + } + } + return nil + }) + } + // Wait for things to settle down, then close results chan + go func() { + _ = errg.Wait() // error is checked later + close(results) + }() + + for item := range results { + driveItems[item.GetId()] = *item + } + + if err := errg.Wait(); err != nil { + return nil, err } return driveItems, nil From 01cc32deeca31c802594713e0686548a2c6780bc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn=20Friedrich=20Dreyer?= Date: Thu, 28 Nov 2024 17:08:16 +0100 Subject: [PATCH 2/2] Update services/graph/pkg/config/config.go Co-authored-by: Martin --- services/graph/pkg/config/config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/graph/pkg/config/config.go b/services/graph/pkg/config/config.go index daa1071d28..497f5ab053 100644 --- a/services/graph/pkg/config/config.go +++ b/services/graph/pkg/config/config.go @@ -31,7 +31,7 @@ type Config struct { IncludeOCMSharees bool `yaml:"include_ocm_sharees" env:"OCIS_ENABLE_OCM;GRAPH_INCLUDE_OCM_SHAREES" desc:"Include OCM sharees when listing users." introductionVersion:"5.0"` Events Events `yaml:"events"` UnifiedRoles UnifiedRoles `yaml:"unified_roles"` - MaxConcurrency int `yaml:"max_concurrency" env:"OCIS_MAX_CONCURRENCY;GRAPH_MAX_CONCURRENCY" desc:"The maximum number of concurrent requests the service will handle." introductionVersion:"7.0"` + MaxConcurrency int `yaml:"max_concurrency" env:"OCIS_MAX_CONCURRENCY;GRAPH_MAX_CONCURRENCY" desc:"The maximum number of concurrent requests the service will handle." introductionVersion:"7.0.0"` Keycloak Keycloak `yaml:"keycloak"` ServiceAccount ServiceAccount `yaml:"service_account"`