diff --git a/go.mod b/go.mod index 7f3a3b933..413c4f537 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/cenkalti/backoff v2.2.1+incompatible github.com/coreos/go-oidc/v3 v3.9.0 github.com/cs3org/go-cs3apis v0.0.0-20231023073225-7748710e0781 - github.com/cs3org/reva/v2 v2.19.3 + github.com/cs3org/reva/v2 v2.19.4 github.com/dhowden/tag v0.0.0-20230630033851-978a0926ee25 github.com/disintegration/imaging v1.6.2 github.com/dutchcoders/go-clamd v0.0.0-20170520113014-b970184f4d9e diff --git a/go.sum b/go.sum index fc49275ad..0cfa3eb2d 100644 --- a/go.sum +++ b/go.sum @@ -1019,8 +1019,8 @@ github.com/crewjam/saml v0.4.14 h1:g9FBNx62osKusnFzs3QTN5L9CVA/Egfgm+stJShzw/c= github.com/crewjam/saml v0.4.14/go.mod h1:UVSZCf18jJkk6GpWNVqcyQJMD5HsRugBPf4I1nl2mME= github.com/cs3org/go-cs3apis v0.0.0-20231023073225-7748710e0781 h1:BUdwkIlf8IS2FasrrPg8gGPHQPOrQ18MS1Oew2tmGtY= github.com/cs3org/go-cs3apis v0.0.0-20231023073225-7748710e0781/go.mod h1:UXha4TguuB52H14EMoSsCqDj7k8a/t7g4gVP+bgY5LY= -github.com/cs3org/reva/v2 v2.19.3 h1:MKzUAa0HYuZog14dEg7TSBjT/0MtcOd2HZ4js/m7Gfg= -github.com/cs3org/reva/v2 v2.19.3/go.mod h1:GRUrOp5HbFVwZTgR9bVrMZ/MvVy+Jhxw1PdMmhhKP9E= +github.com/cs3org/reva/v2 v2.19.4 h1:gOcV6cgV+es624ckLUkXWL9mbHZpPXEgsa83/YA6WYA= +github.com/cs3org/reva/v2 v2.19.4/go.mod h1:GRUrOp5HbFVwZTgR9bVrMZ/MvVy+Jhxw1PdMmhhKP9E= github.com/cyberdelia/templates v0.0.0-20141128023046-ca7fffd4298c/go.mod h1:GyV+0YP4qX0UQ7r2MoYZ+AvYDp12OF5yg4q8rGnyNh4= github.com/cyphar/filepath-securejoin v0.2.4 h1:Ugdm7cg7i6ZK6x3xDF1oEu1nfkyfH53EtKeQYTC3kyg= github.com/cyphar/filepath-securejoin v0.2.4/go.mod h1:aPGpWjXOXUn2NCNjFvBE6aRxGGx79pTxQpKOJNYHHl4= diff --git a/ocis/pkg/command/migrate.go b/ocis/pkg/command/migrate.go index 319cc8350..3ec250a06 100644 --- a/ocis/pkg/command/migrate.go +++ b/ocis/pkg/command/migrate.go @@ -106,11 +106,11 @@ func RebuildJSONCS3Indexes(cfg *config.Config) *cli.Command { if err != nil { return err } - gc, err := pool.GetGatewayServiceClient(conf.GatewayAddr) + gatewaySelector, err := pool.GatewaySelector(conf.GatewayAddr) if err != nil { return err } - mgr, err := jsoncs3.New(s, gc, 0, nil, 1) + mgr, err := jsoncs3.New(s, gatewaySelector, 0, nil, 1) if err != nil { return err } diff --git a/vendor/github.com/cs3org/reva/v2/pkg/share/manager/jsoncs3/jsoncs3.go b/vendor/github.com/cs3org/reva/v2/pkg/share/manager/jsoncs3/jsoncs3.go index be1629119..edd5e00b2 100644 --- a/vendor/github.com/cs3org/reva/v2/pkg/share/manager/jsoncs3/jsoncs3.go +++ b/vendor/github.com/cs3org/reva/v2/pkg/share/manager/jsoncs3/jsoncs3.go @@ -29,14 +29,6 @@ import ( rpcv1beta1 "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1" collaboration "github.com/cs3org/go-cs3apis/cs3/sharing/collaboration/v1beta1" provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" - "github.com/google/uuid" - "github.com/mitchellh/mapstructure" - "github.com/pkg/errors" - "github.com/rs/zerolog/log" - "go.opentelemetry.io/otel/codes" - "golang.org/x/sync/errgroup" - "google.golang.org/genproto/protobuf/field_mask" - "github.com/cs3org/reva/v2/pkg/appctx" ctxpkg "github.com/cs3org/reva/v2/pkg/ctx" "github.com/cs3org/reva/v2/pkg/errtypes" @@ -52,6 +44,13 @@ import ( "github.com/cs3org/reva/v2/pkg/storage/utils/metadata" // nolint:staticcheck // we need the legacy package to convert V1 to V2 messages "github.com/cs3org/reva/v2/pkg/storagespace" "github.com/cs3org/reva/v2/pkg/utils" + "github.com/google/uuid" + "github.com/mitchellh/mapstructure" + "github.com/pkg/errors" + "go.opentelemetry.io/otel/codes" + "golang.org/x/sync/errgroup" + "google.golang.org/genproto/protobuf/field_mask" + "google.golang.org/protobuf/types/known/fieldmaskpb" ) /* @@ -153,8 +152,8 @@ type Manager struct { MaxConcurrency int - gateway gatewayv1beta1.GatewayAPIClient - eventStream events.Stream + gatewaySelector pool.Selectable[gatewayv1beta1.GatewayAPIClient] + eventStream events.Stream } // NewDefault returns a new manager instance with default dependencies @@ -170,7 +169,7 @@ func NewDefault(m map[string]interface{}) (share.Manager, error) { return nil, err } - gc, err := pool.GetGatewayServiceClient(c.GatewayAddr) + gatewaySelector, err := pool.GatewaySelector(c.GatewayAddr) if err != nil { return nil, err } @@ -183,11 +182,11 @@ func NewDefault(m map[string]interface{}) (share.Manager, error) { } } - return New(s, gc, c.CacheTTL, es, c.MaxConcurrency) + return New(s, gatewaySelector, c.CacheTTL, es, c.MaxConcurrency) } // New returns a new manager instance. -func New(s metadata.Storage, gc gatewayv1beta1.GatewayAPIClient, ttlSeconds int, es events.Stream, maxconcurrency int) (*Manager, error) { +func New(s metadata.Storage, gatewaySelector pool.Selectable[gatewayv1beta1.GatewayAPIClient], ttlSeconds int, es events.Stream, maxconcurrency int) (*Manager, error) { ttl := time.Duration(ttlSeconds) * time.Second return &Manager{ Cache: providercache.New(s, ttl), @@ -195,7 +194,7 @@ func New(s metadata.Storage, gc gatewayv1beta1.GatewayAPIClient, ttlSeconds int, UserReceivedStates: receivedsharecache.New(s, ttl), GroupReceivedCache: sharecache.New(s, "groups", "received.json", ttl), storage: s, - gateway: gc, + gatewaySelector: gatewaySelector, eventStream: es, MaxConcurrency: maxconcurrency, }, nil @@ -411,6 +410,7 @@ func (m *Manager) get(ctx context.Context, ref *collaboration.ShareReference) (s func (m *Manager) GetShare(ctx context.Context, ref *collaboration.ShareReference) (*collaboration.Share, error) { ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "GetShare") defer span.End() + sublog := appctx.GetLogger(ctx).With().Str("id", ref.GetId().GetOpaqueId()).Str("key", ref.GetKey().String()).Str("driver", "jsoncs3").Str("handler", "GetShare").Logger() if err := m.initialize(ctx); err != nil { return nil, err } @@ -421,7 +421,7 @@ func (m *Manager) GetShare(ctx context.Context, ref *collaboration.ShareReferenc } if share.IsExpired(s) { if err := m.removeShare(ctx, s); err != nil { - log.Error().Err(err). + sublog.Error().Err(err). Msg("failed to unshare expired share") } if err := events.Publish(ctx, m.eventStream, events.ShareExpired{ @@ -432,7 +432,7 @@ func (m *Manager) GetShare(ctx context.Context, ref *collaboration.ShareReferenc GranteeUserID: s.GetGrantee().GetUserId(), GranteeGroupID: s.GetGrantee().GetGroupId(), }); err != nil { - log.Error().Err(err). + sublog.Error().Err(err). Msg("failed to publish share expired event") } } @@ -445,8 +445,15 @@ func (m *Manager) GetShare(ctx context.Context, ref *collaboration.ShareReferenc req := &provider.StatRequest{ Ref: &provider.Reference{ResourceId: s.ResourceId}, + FieldMask: &fieldmaskpb.FieldMask{ + Paths: []string{"permissions"}, + }, } - res, err := m.gateway.Stat(ctx, req) + client, err := m.gatewaySelector.Next() + if err != nil { + return nil, err + } + res, err := client.Stat(ctx, req) if err == nil && res.Status.Code == rpcv1beta1.Code_CODE_OK && res.Info.PermissionSet.ListGrants { @@ -523,8 +530,15 @@ func (m *Manager) UpdateShare(ctx context.Context, ref *collaboration.ShareRefer if !share.IsCreatedByUser(toUpdate, user) { req := &provider.StatRequest{ Ref: &provider.Reference{ResourceId: toUpdate.ResourceId}, + FieldMask: &fieldmaskpb.FieldMask{ + Paths: []string{"permissions"}, + }, } - res, err := m.gateway.Stat(ctx, req) + client, err := m.gatewaySelector.Next() + if err != nil { + return nil, err + } + res, err := client.Stat(ctx, req) if err != nil || res.Status.Code != rpcv1beta1.Code_CODE_OK || !res.Info.PermissionSet.UpdateGrant { @@ -583,6 +597,7 @@ func (m *Manager) ListShares(ctx context.Context, filters []*collaboration.Filte func (m *Manager) listSharesByIDs(ctx context.Context, user *userv1beta1.User, filters []*collaboration.Filter) ([]*collaboration.Share, error) { ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "listSharesByIDs") defer span.End() + sublog := appctx.GetLogger(ctx).With().Str("userid", user.GetId().GetOpaqueId()).Str("useridp", user.GetId().GetIdp()).Str("driver", "jsoncs3").Str("handler", "listSharesByIDs").Logger() providerSpaces := make(map[string]map[string]struct{}) for _, f := range share.FilterFiltersByType(filters, collaboration.Filter_TYPE_RESOURCE_ID) { @@ -604,19 +619,21 @@ func (m *Manager) listSharesByIDs(ctx context.Context, user *userv1beta1.User, f } for _, s := range shares.Shares { + resourceID := s.GetResourceId() + sublog = sublog.With().Str("storageid", resourceID.GetStorageId()).Str("spaceid", resourceID.GetSpaceId()).Str("opaqueid", resourceID.GetOpaqueId()).Logger() if share.IsExpired(s) { if err := m.removeShare(ctx, s); err != nil { - log.Error().Err(err). + sublog.Error().Err(err). Msg("failed to unshare expired share") } if err := events.Publish(ctx, m.eventStream, events.ShareExpired{ ShareOwner: s.GetOwner(), - ItemID: s.GetResourceId(), + ItemID: resourceID, ExpiredAt: time.Unix(int64(s.GetExpiration().GetSeconds()), int64(s.GetExpiration().GetNanos())), GranteeUserID: s.GetGrantee().GetUserId(), GranteeGroupID: s.GetGrantee().GetGroupId(), }); err != nil { - log.Error().Err(err). + sublog.Error().Err(err). Msg("failed to publish share expired event") } continue @@ -626,17 +643,33 @@ func (m *Manager) listSharesByIDs(ctx context.Context, user *userv1beta1.User, f } if !(share.IsCreatedByUser(s, user) || share.IsGrantedToUser(s, user)) { - key := storagespace.FormatResourceID(*s.ResourceId) + key := storagespace.FormatResourceID(*resourceID) if _, hit := statCache[key]; !hit { req := &provider.StatRequest{ - Ref: &provider.Reference{ResourceId: s.ResourceId}, + Ref: &provider.Reference{ResourceId: resourceID}, + FieldMask: &fieldmaskpb.FieldMask{ + Paths: []string{"permissions"}, + }, } - res, err := m.gateway.Stat(ctx, req) - if err != nil || - res.Status.Code != rpcv1beta1.Code_CODE_OK || - !res.Info.PermissionSet.ListGrants { + client, err := m.gatewaySelector.Next() + if err != nil { + sublog.Error().Err(err).Msg("failed to select next gateway client") continue } + res, err := client.Stat(ctx, req) + if err != nil { + sublog.Error().Err(err).Msg("failed to make stat call") + continue + } + if res.Status.Code != rpcv1beta1.Code_CODE_OK { + sublog.Debug().Str("code", res.GetStatus().GetCode().String()).Msg(res.GetStatus().GetMessage()) + continue + } + if !res.Info.PermissionSet.ListGrants { + sublog.Debug().Msg("user has no list grants permission") + continue + } + sublog.Debug().Msg("listing share for non participating user") statCache[key] = struct{}{} } } @@ -652,6 +685,7 @@ func (m *Manager) listSharesByIDs(ctx context.Context, user *userv1beta1.User, f func (m *Manager) listCreatedShares(ctx context.Context, user *userv1beta1.User, filters []*collaboration.Filter) ([]*collaboration.Share, error) { ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "listCreatedShares") defer span.End() + sublog := appctx.GetLogger(ctx).With().Str("userid", user.GetId().GetOpaqueId()).Str("useridp", user.GetId().GetIdp()).Str("driver", "jsoncs3").Str("handler", "listCreatedShares").Logger() list, err := m.CreatedCache.List(ctx, user.Id.OpaqueId) if err != nil { @@ -694,7 +728,7 @@ func (m *Manager) listCreatedShares(ctx context.Context, user *userv1beta1.User, // fetch all shares from space with one request _, err := m.Cache.ListSpace(ctx, storageID, spaceID) if err != nil { - log.Error().Err(err). + sublog.Error().Err(err). Str("storageid", storageID). Str("spaceid", spaceID). Msg("failed to list shares in space") @@ -707,7 +741,7 @@ func (m *Manager) listCreatedShares(ctx context.Context, user *userv1beta1.User, } if share.IsExpired(s) { if err := m.removeShare(ctx, s); err != nil { - log.Error().Err(err). + sublog.Error().Err(err). Msg("failed to unshare expired share") } if err := events.Publish(ctx, m.eventStream, events.ShareExpired{ @@ -717,7 +751,7 @@ func (m *Manager) listCreatedShares(ctx context.Context, user *userv1beta1.User, GranteeUserID: s.GetGrantee().GetUserId(), GranteeGroupID: s.GetGrantee().GetGroupId(), }); err != nil { - log.Error().Err(err). + sublog.Error().Err(err). Msg("failed to publish share expired event") } continue @@ -762,6 +796,7 @@ func (m *Manager) listCreatedShares(ctx context.Context, user *userv1beta1.User, func (m *Manager) ListReceivedShares(ctx context.Context, filters []*collaboration.Filter, forUser *userv1beta1.UserId) ([]*collaboration.ReceivedShare, error) { ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "ListReceivedShares") defer span.End() + sublog := appctx.GetLogger(ctx).With().Str("driver", "jsoncs3").Str("handler", "ListReceivedShares").Logger() if err := m.initialize(ctx); err != nil { return nil, err @@ -769,7 +804,11 @@ func (m *Manager) ListReceivedShares(ctx context.Context, filters []*collaborati user := ctxpkg.ContextMustGetUser(ctx) if user.GetId().GetType() == userv1beta1.UserType_USER_TYPE_SERVICE { - u, err := utils.GetUser(forUser, m.gateway) + client, err := m.gatewaySelector.Next() + if err != nil { + return nil, err + } + u, err := utils.GetUser(forUser, client) if err != nil { return nil, errtypes.BadRequest("user not found") } @@ -852,12 +891,11 @@ func (m *Manager) ListReceivedShares(ctx context.Context, filters []*collaborati g.Go(func() error { for w := range work { storageID, spaceID, _ := shareid.Decode(w.ssid) + sublogr := sublog.With().Str("storageid", storageID).Str("spaceid", spaceID).Logger() // fetch all shares from space with one request _, err := m.Cache.ListSpace(ctx, storageID, spaceID) if err != nil { - log.Error().Err(err). - Str("storageid", storageID). - Str("spaceid", spaceID). + sublogr.Error().Err(err). Msg("failed to list shares in space") continue } @@ -866,9 +904,10 @@ func (m *Manager) ListReceivedShares(ctx context.Context, filters []*collaborati if err != nil || s == nil { continue } + sublogr = sublogr.With().Str("shareid", shareID).Logger() if share.IsExpired(s) { if err := m.removeShare(ctx, s); err != nil { - log.Error().Err(err). + sublogr.Error().Err(err). Msg("failed to unshare expired share") } if err := events.Publish(ctx, m.eventStream, events.ShareExpired{ @@ -878,7 +917,7 @@ func (m *Manager) ListReceivedShares(ctx context.Context, filters []*collaborati GranteeUserID: s.GetGrantee().GetUserId(), GranteeGroupID: s.GetGrantee().GetGroupId(), }); err != nil { - log.Error().Err(err). + sublogr.Error().Err(err). Msg("failed to publish share expired event") } continue @@ -959,6 +998,7 @@ func (m *Manager) GetReceivedShare(ctx context.Context, ref *collaboration.Share func (m *Manager) getReceived(ctx context.Context, ref *collaboration.ShareReference) (*collaboration.ReceivedShare, error) { ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "getReceived") defer span.End() + sublog := appctx.GetLogger(ctx).With().Str("id", ref.GetId().GetOpaqueId()).Str("key", ref.GetKey().String()).Str("driver", "jsoncs3").Str("handler", "getReceived").Logger() s, err := m.get(ctx, ref) if err != nil { @@ -970,7 +1010,7 @@ func (m *Manager) getReceived(ctx context.Context, ref *collaboration.ShareRefer } if share.IsExpired(s) { if err := m.removeShare(ctx, s); err != nil { - log.Error().Err(err). + sublog.Error().Err(err). Msg("failed to unshare expired share") } if err := events.Publish(ctx, m.eventStream, events.ShareExpired{ @@ -980,7 +1020,7 @@ func (m *Manager) getReceived(ctx context.Context, ref *collaboration.ShareRefer GranteeUserID: s.GetGrantee().GetUserId(), GranteeGroupID: s.GetGrantee().GetGroupId(), }); err != nil { - log.Error().Err(err). + sublog.Error().Err(err). Msg("failed to publish share expired event") } } diff --git a/vendor/modules.txt b/vendor/modules.txt index aec61e5e3..710ff9354 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -359,7 +359,7 @@ github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1 github.com/cs3org/go-cs3apis/cs3/storage/registry/v1beta1 github.com/cs3org/go-cs3apis/cs3/tx/v1beta1 github.com/cs3org/go-cs3apis/cs3/types/v1beta1 -# github.com/cs3org/reva/v2 v2.19.3 +# github.com/cs3org/reva/v2 v2.19.4 ## explicit; go 1.21 github.com/cs3org/reva/v2/cmd/revad/internal/grace github.com/cs3org/reva/v2/cmd/revad/runtime