Merge pull request #10683 from owncloud/graph-concurrent-share-listing

graph concurrent share listing
This commit is contained in:
kobergj
2024-12-02 16:50:55 +01:00
committed by GitHub
6 changed files with 293 additions and 87 deletions

View File

@@ -0,0 +1,5 @@
Enhancement: fetch shared resource metadata concurrently in graph
We now concurrently stat shared resources when listing shares
https://github.com/owncloud/ocis/pull/10683

View File

@@ -31,6 +31,7 @@ type Config struct {
IncludeOCMSharees bool `yaml:"include_ocm_sharees" env:"OCIS_ENABLE_OCM;GRAPH_INCLUDE_OCM_SHAREES" desc:"Include OCM sharees when listing users." introductionVersion:"5.0"`
Events Events `yaml:"events"`
UnifiedRoles UnifiedRoles `yaml:"unified_roles"`
MaxConcurrency int `yaml:"max_concurrency" env:"OCIS_MAX_CONCURRENCY;GRAPH_MAX_CONCURRENCY" desc:"The maximum number of concurrent requests the service will handle." introductionVersion:"7.0.0"`
Keycloak Keycloak `yaml:"keycloak"`
ServiceAccount ServiceAccount `yaml:"service_account"`

View File

@@ -121,6 +121,7 @@ func DefaultConfig() *config.Config {
Cluster: "ocis-cluster",
EnableTLS: false,
},
MaxConcurrency: 20,
UnifiedRoles: config.UnifiedRoles{
AvailableRoles: nil, // will be populated with defaults in EnsureDefaults
},

View File

@@ -378,7 +378,14 @@ func (s DriveItemPermissionsService) ListPermissions(ctx context.Context, itemID
return collectionOfPermissions, nil
}
driveItems := make(driveItemsByResourceID)
driveItems := make(driveItemsByResourceID, 1)
// we can use the statResponse to build the drive item before fetching the shares
item, err := cs3ResourceToDriveItem(s.logger, statResponse.GetInfo())
if err != nil {
return collectionOfPermissions, err
}
driveItems[storagespace.FormatResourceID(statResponse.GetInfo().GetId())] = *item
if IsSpaceRoot(statResponse.GetInfo().GetId()) {
permissions, err := s.getSpaceRootPermissions(ctx, statResponse.GetInfo().GetSpace().GetId())
if err != nil {

View File

@@ -419,10 +419,11 @@ var _ = Describe("DriveItemPermissionsService", func() {
OpaqueId: "public-share-id",
},
Token: "public-share-token",
// the link shares the same resource id
ResourceId: &provider.ResourceId{
StorageId: "storageid",
SpaceId: "spaceid",
OpaqueId: "public-share-opaqueid",
StorageId: "1",
SpaceId: "2",
OpaqueId: "3",
},
Permissions: &link.PublicSharePermissions{Permissions: roleconversions.NewViewerRole().CS3ResourcePermissions()},
},

View File

@@ -17,6 +17,7 @@ import (
storageprovider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
types "github.com/cs3org/go-cs3apis/cs3/types/v1beta1"
libregraph "github.com/owncloud/libre-graph-api-go"
"golang.org/x/sync/errgroup"
"google.golang.org/protobuf/types/known/fieldmaskpb"
"github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool"
@@ -350,77 +351,204 @@ func (g BaseGraphService) listPublicShares(ctx context.Context, filters []*link.
}
func (g BaseGraphService) cs3UserSharesToDriveItems(ctx context.Context, shares []*collaboration.Share, driveItems driveItemsByResourceID) (driveItemsByResourceID, error) {
for _, s := range shares {
g.logger.Debug().Interface("CS3 UserShare", s).Msg("Got Share")
resIDStr := storagespace.FormatResourceID(s.ResourceId)
item, ok := driveItems[resIDStr]
if !ok {
itemptr, err := g.getDriveItem(ctx, &storageprovider.Reference{ResourceId: s.ResourceId})
if err != nil {
g.logger.Debug().Err(err).Interface("Share", s.ResourceId).Msg("could not stat share, skipping")
continue
}
item = *itemptr
}
errg, ctx := errgroup.WithContext(ctx)
var condition string
switch {
case item.Root != nil:
condition = unifiedrole.UnifiedRoleConditionDrive
case item.Folder != nil:
condition = unifiedrole.UnifiedRoleConditionFolder
case item.File != nil:
condition = unifiedrole.UnifiedRoleConditionFile
}
perm, err := g.cs3UserShareToPermission(ctx, s, condition)
var errcode errorcode.Error
switch {
case errors.As(err, &errcode) && errcode.GetCode() == errorcode.ItemNotFound:
// The Grantee couldn't be found (user/group does not exist anymore)
continue
case err != nil:
return driveItems, err
}
item.Permissions = append(item.Permissions, *perm)
driveItems[resIDStr] = item
// group shares by resource id
sharesByResource := make(map[string][]*collaboration.Share)
for _, share := range shares {
sharesByResource[share.GetResourceId().String()] = append(sharesByResource[share.GetResourceId().String()], share)
}
type resourceShares struct {
ResourceID *storageprovider.ResourceId
Shares []*collaboration.Share
}
work := make(chan resourceShares, len(shares))
results := make(chan *libregraph.DriveItem, len(shares))
// Distribute work
errg.Go(func() error {
defer close(work)
for _, shares := range sharesByResource {
select {
case work <- resourceShares{ResourceID: shares[0].GetResourceId(), Shares: shares}:
case <-ctx.Done():
return ctx.Err()
}
}
return nil
})
// Spawn workers that'll concurrently work the queue
numWorkers := g.config.MaxConcurrency
if len(sharesByResource) < numWorkers {
numWorkers = len(sharesByResource)
}
for i := 0; i < numWorkers; i++ {
errg.Go(func() error {
for sharesByResource := range work {
resIDStr := storagespace.FormatResourceID(sharesByResource.ResourceID)
// check if we already have the drive item in the map
item, ok := driveItems[resIDStr]
if !ok {
itemptr, err := g.getDriveItem(ctx, &storageprovider.Reference{ResourceId: sharesByResource.ResourceID})
if err != nil {
g.logger.Debug().Err(err).Str("storage", sharesByResource.ResourceID.StorageId).Str("space", sharesByResource.ResourceID.SpaceId).Str("node", sharesByResource.ResourceID.OpaqueId).Msg("could not stat resource, skipping")
continue
}
item = *itemptr
}
var condition string
switch {
case item.Root != nil:
condition = unifiedrole.UnifiedRoleConditionDrive
case item.Folder != nil:
condition = unifiedrole.UnifiedRoleConditionFolder
case item.File != nil:
condition = unifiedrole.UnifiedRoleConditionFile
}
for _, share := range sharesByResource.Shares {
perm, err := g.cs3UserShareToPermission(ctx, share, condition)
var errcode errorcode.Error
switch {
case errors.As(err, &errcode) && errcode.GetCode() == errorcode.ItemNotFound:
// The Grantee couldn't be found (user/group does not exist anymore)
continue
case err != nil:
return err
}
item.Permissions = append(item.Permissions, *perm)
}
if len(item.Permissions) == 0 {
continue
}
select {
case results <- &item:
case <-ctx.Done():
return ctx.Err()
}
}
return nil
})
}
// Wait for things to settle down, then close results chan
go func() {
_ = errg.Wait() // error is checked later
close(results)
}()
for item := range results {
driveItems[item.GetId()] = *item
}
if err := errg.Wait(); err != nil {
return nil, err
}
return driveItems, nil
}
func (g BaseGraphService) cs3OCMSharesToDriveItems(ctx context.Context, shares []*ocm.Share, driveItems driveItemsByResourceID) (driveItemsByResourceID, error) {
for _, s := range shares {
g.logger.Debug().Interface("CS3 OCMShare", s).Msg("Got Share")
resIDStr := storagespace.FormatResourceID(s.ResourceId)
item, ok := driveItems[resIDStr]
if !ok {
itemptr, err := g.getDriveItem(ctx, &storageprovider.Reference{ResourceId: s.ResourceId})
if err != nil {
g.logger.Debug().Err(err).Interface("Share", s.ResourceId).Msg("could not stat ocm share, skipping")
continue
}
item = *itemptr
}
errg, ctx := errgroup.WithContext(ctx)
var condition string
switch {
case item.Folder != nil:
condition = unifiedrole.UnifiedRoleConditionFolderFederatedUser
case item.File != nil:
condition = unifiedrole.UnifiedRoleConditionFileFederatedUser
}
perm, err := g.cs3OCMShareToPermission(ctx, s, condition)
var errcode errorcode.Error
switch {
case errors.As(err, &errcode) && errcode.GetCode() == errorcode.ItemNotFound:
// The Grantee couldn't be found (user/group does not exist anymore)
continue
case err != nil:
return driveItems, err
}
item.Permissions = append(item.Permissions, *perm)
driveItems[resIDStr] = item
// group shares by resource id
sharesByResource := make(map[string][]*ocm.Share)
for _, share := range shares {
sharesByResource[share.GetResourceId().String()] = append(sharesByResource[share.GetResourceId().String()], share)
}
type resourceShares struct {
ResourceID *storageprovider.ResourceId
Shares []*ocm.Share
}
work := make(chan resourceShares, len(shares))
results := make(chan *libregraph.DriveItem, len(shares))
// Distribute work
errg.Go(func() error {
defer close(work)
for _, shares := range sharesByResource {
select {
case work <- resourceShares{ResourceID: shares[0].GetResourceId(), Shares: shares}:
case <-ctx.Done():
return ctx.Err()
}
}
return nil
})
// Spawn workers that'll concurrently work the queue
numWorkers := g.config.MaxConcurrency
if len(sharesByResource) < numWorkers {
numWorkers = len(sharesByResource)
}
for i := 0; i < numWorkers; i++ {
errg.Go(func() error {
for sharesByResource := range work {
resIDStr := storagespace.FormatResourceID(sharesByResource.ResourceID)
item, ok := driveItems[resIDStr]
if !ok {
itemptr, err := g.getDriveItem(ctx, &storageprovider.Reference{ResourceId: sharesByResource.ResourceID})
if err != nil {
g.logger.Debug().Err(err).Interface("Share", sharesByResource.ResourceID).Msg("could not stat ocm share, skipping")
continue
}
item = *itemptr
}
var condition string
switch {
case item.Folder != nil:
condition = unifiedrole.UnifiedRoleConditionFolderFederatedUser
case item.File != nil:
condition = unifiedrole.UnifiedRoleConditionFileFederatedUser
}
for _, share := range sharesByResource.Shares {
perm, err := g.cs3OCMShareToPermission(ctx, share, condition)
var errcode errorcode.Error
switch {
case errors.As(err, &errcode) && errcode.GetCode() == errorcode.ItemNotFound:
// The Grantee couldn't be found (user/group does not exist anymore)
continue
case err != nil:
return err
}
item.Permissions = append(item.Permissions, *perm)
}
if len(item.Permissions) == 0 {
continue
}
select {
case results <- &item:
case <-ctx.Done():
return ctx.Err()
}
}
return nil
})
}
// Wait for things to settle down, then close results chan
go func() {
_ = errg.Wait() // error is checked later
close(results)
}()
for item := range results {
driveItems[item.GetId()] = *item
}
if err := errg.Wait(); err != nil {
return nil, err
}
return driveItems, nil
}
@@ -593,26 +721,89 @@ func (g BaseGraphService) cs3OCMShareToPermission(ctx context.Context, share *oc
}
func (g BaseGraphService) cs3PublicSharesToDriveItems(ctx context.Context, shares []*link.PublicShare, driveItems driveItemsByResourceID) (driveItemsByResourceID, error) {
for _, s := range shares {
g.logger.Debug().Interface("CS3 PublicShare", s).Msg("Got Share")
resIDStr := storagespace.FormatResourceID(s.ResourceId)
item, ok := driveItems[resIDStr]
if !ok {
itemptr, err := g.getDriveItem(ctx, &storageprovider.Reference{ResourceId: s.ResourceId})
if err != nil {
g.logger.Debug().Err(err).Interface("Share", s.ResourceId).Msg("could not stat share, skipping")
continue
}
item = *itemptr
}
perm, err := g.libreGraphPermissionFromCS3PublicShare(s)
if err != nil {
g.logger.Error().Err(err).Interface("Link", s.ResourceId).Msg("could not convert link to libregraph")
return driveItems, err
}
errg, ctx := errgroup.WithContext(ctx)
item.Permissions = append(item.Permissions, *perm)
driveItems[resIDStr] = item
// group shares by resource id
sharesByResource := make(map[string][]*link.PublicShare)
for _, share := range shares {
sharesByResource[share.GetResourceId().String()] = append(sharesByResource[share.GetResourceId().String()], share)
}
type resourceShares struct {
ResourceID *storageprovider.ResourceId
Shares []*link.PublicShare
}
work := make(chan resourceShares, len(shares))
results := make(chan *libregraph.DriveItem, len(shares))
// Distribute work
errg.Go(func() error {
defer close(work)
for _, shares := range sharesByResource {
select {
case work <- resourceShares{ResourceID: shares[0].GetResourceId(), Shares: shares}:
case <-ctx.Done():
return ctx.Err()
}
}
return nil
})
// Spawn workers that'll concurrently work the queue
numWorkers := g.config.MaxConcurrency
if len(sharesByResource) < numWorkers {
numWorkers = len(sharesByResource)
}
for i := 0; i < numWorkers; i++ {
errg.Go(func() error {
for sharesByResource := range work {
resIDStr := storagespace.FormatResourceID(sharesByResource.ResourceID)
item, ok := driveItems[resIDStr]
if !ok {
itemptr, err := g.getDriveItem(ctx, &storageprovider.Reference{ResourceId: sharesByResource.ResourceID})
if err != nil {
g.logger.Debug().Err(err).Interface("Share", sharesByResource.ResourceID).Msg("could not stat share, skipping")
continue
}
item = *itemptr
}
for _, share := range sharesByResource.Shares {
perm, err := g.libreGraphPermissionFromCS3PublicShare(share)
if err != nil {
g.logger.Error().Err(err).Interface("Link", sharesByResource.ResourceID).Msg("could not convert link to libregraph")
return err
}
item.Permissions = append(item.Permissions, *perm)
}
if len(item.Permissions) == 0 {
continue
}
select {
case results <- &item:
case <-ctx.Done():
return ctx.Err()
}
}
return nil
})
}
// Wait for things to settle down, then close results chan
go func() {
_ = errg.Wait() // error is checked later
close(results)
}()
for item := range results {
driveItems[item.GetId()] = *item
}
if err := errg.Wait(); err != nil {
return nil, err
}
return driveItems, nil