diff --git a/changelog/unreleased/sharing-sse.md b/changelog/unreleased/sharing-sse.md new file mode 100644 index 000000000..dfc859c88 --- /dev/null +++ b/changelog/unreleased/sharing-sse.md @@ -0,0 +1,5 @@ +Enhancement: Sharing SSEs + +Added server side events for item moved, share created/updated/removed, space membership created/removed. + +https://github.com/owncloud/ocis/pull/8854 diff --git a/go.mod b/go.mod index 57331e69e..4091b81b9 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,7 @@ require ( github.com/cenkalti/backoff v2.2.1+incompatible github.com/coreos/go-oidc/v3 v3.10.0 github.com/cs3org/go-cs3apis v0.0.0-20231023073225-7748710e0781 - github.com/cs3org/reva/v2 v2.19.2-0.20240405190914-ef59ba20ef0e + github.com/cs3org/reva/v2 v2.19.2-0.20240415154646-5c2c9831a2b7 github.com/dhowden/tag v0.0.0-20230630033851-978a0926ee25 github.com/disintegration/imaging v1.6.2 github.com/dutchcoders/go-clamd v0.0.0-20170520113014-b970184f4d9e diff --git a/go.sum b/go.sum index 6de6fa1c2..aa4692d1a 100644 --- a/go.sum +++ b/go.sum @@ -1022,8 +1022,8 @@ github.com/crewjam/saml v0.4.14 h1:g9FBNx62osKusnFzs3QTN5L9CVA/Egfgm+stJShzw/c= github.com/crewjam/saml v0.4.14/go.mod h1:UVSZCf18jJkk6GpWNVqcyQJMD5HsRugBPf4I1nl2mME= github.com/cs3org/go-cs3apis v0.0.0-20231023073225-7748710e0781 h1:BUdwkIlf8IS2FasrrPg8gGPHQPOrQ18MS1Oew2tmGtY= github.com/cs3org/go-cs3apis v0.0.0-20231023073225-7748710e0781/go.mod h1:UXha4TguuB52H14EMoSsCqDj7k8a/t7g4gVP+bgY5LY= -github.com/cs3org/reva/v2 v2.19.2-0.20240405190914-ef59ba20ef0e h1:RAmS/42ZYRrkhvVLWgvJhwjna9zSAqa2DJ8xS7R3Rx0= -github.com/cs3org/reva/v2 v2.19.2-0.20240405190914-ef59ba20ef0e/go.mod h1:GRUrOp5HbFVwZTgR9bVrMZ/MvVy+Jhxw1PdMmhhKP9E= +github.com/cs3org/reva/v2 v2.19.2-0.20240415154646-5c2c9831a2b7 h1:7oBqhyPUyWSRNMKG5wLYDm9TSc0gfKj+LeVaUhjm+hQ= +github.com/cs3org/reva/v2 v2.19.2-0.20240415154646-5c2c9831a2b7/go.mod h1:GRUrOp5HbFVwZTgR9bVrMZ/MvVy+Jhxw1PdMmhhKP9E= github.com/cyberdelia/templates v0.0.0-20141128023046-ca7fffd4298c/go.mod h1:GyV+0YP4qX0UQ7r2MoYZ+AvYDp12OF5yg4q8rGnyNh4= github.com/cyphar/filepath-securejoin v0.2.4 h1:Ugdm7cg7i6ZK6x3xDF1oEu1nfkyfH53EtKeQYTC3kyg= github.com/cyphar/filepath-securejoin v0.2.4/go.mod h1:aPGpWjXOXUn2NCNjFvBE6aRxGGx79pTxQpKOJNYHHl4= diff --git a/services/clientlog/pkg/command/server.go b/services/clientlog/pkg/command/server.go index f3cab7ada..7c41e7a36 100644 --- a/services/clientlog/pkg/command/server.go +++ b/services/clientlog/pkg/command/server.go @@ -33,6 +33,12 @@ var _registeredEvents = []events.Unmarshaller{ events.FileLocked{}, events.FileUnlocked{}, events.FileTouched{}, + events.SpaceShared{}, + events.SpaceShareUpdated{}, + events.SpaceUnshared{}, + events.ShareCreated{}, + events.ShareRemoved{}, + events.ShareUpdated{}, } // Server is the entrypoint for the server command. diff --git a/services/clientlog/pkg/service/service.go b/services/clientlog/pkg/service/service.go index a07fe6402..ab7bf7cc4 100644 --- a/services/clientlog/pkg/service/service.go +++ b/services/clientlog/pkg/service/service.go @@ -8,6 +8,8 @@ import ( "reflect" gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1" + group "github.com/cs3org/go-cs3apis/cs3/identity/group/v1beta1" + user "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1" provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" "go.opentelemetry.io/otel/trace" @@ -90,11 +92,6 @@ func (cl *ClientlogService) processEvent(event events.Event) { return } - gwc, err = cl.gatewaySelector.Next() - if err != nil { - cl.log.Error().Err(err).Interface("event", event).Msg("error getting gateway client") - return - } var ( users []string evType string @@ -148,17 +145,37 @@ func (cl *ClientlogService) processEvent(event events.Event) { case events.ContainerCreated: p("folder-created", e.Ref) case events.ItemMoved: - // we are only interested in the rename case - if !utils.ResourceIDEqual(e.OldReference.GetResourceId(), e.Ref.GetResourceId()) || e.Ref.GetPath() == e.OldReference.GetPath() { - return + // we send a dedicated event in case the item was only renamed + if utils.ResourceIDEqual(e.OldReference.GetResourceId(), e.Ref.GetResourceId()) || e.Ref.GetPath() == e.OldReference.GetPath() { + p("item-renamed", e.Ref) + } else { + p("item-moved", e.Ref) } - p("item-renamed", e.Ref) case events.FileLocked: p("file-locked", e.Ref) case events.FileUnlocked: p("file-unlocked", e.Ref) case events.FileTouched: p("file-touched", e.Ref) + case events.SpaceShared: + r, _ := storagespace.ParseReference(e.ID.GetOpaqueId()) + p("space-member-added", &r) + case events.SpaceShareUpdated: + r, _ := storagespace.ParseReference(e.ID.GetOpaqueId()) + p("space-share-updated", &r) + case events.SpaceUnshared: + r, _ := storagespace.ParseReference(e.ID.GetOpaqueId()) + p("space-member-removed", &r) + users, err = addSharees(ctx, users, gwc, e.GranteeUserID, e.GranteeGroupID) + case events.ShareCreated: + p("share-created", &provider.Reference{ResourceId: e.ItemID}) + users, err = addSharees(ctx, users, gwc, e.GranteeUserID, e.GranteeGroupID) + case events.ShareUpdated: + p("share-updated", &provider.Reference{ResourceId: e.ItemID}) + users, err = addSharees(ctx, users, gwc, e.GranteeUserID, e.GranteeGroupID) + case events.ShareRemoved: + p("share-removed", &provider.Reference{ResourceId: e.ItemID}) + users, err = addSharees(ctx, users, gwc, e.GranteeUserID, e.GranteeGroupID) } if err != nil { @@ -203,3 +220,12 @@ func processFileEvent(ctx context.Context, ref *provider.Reference, gwc gateway. users, err := utils.GetSpaceMembers(ctx, info.GetSpace().GetId().GetOpaqueId(), gwc, utils.ViewerRole) return users, data, err } + +// adds userid to users slice or gets members of groupid and adds them to users slice +func addSharees(ctx context.Context, users []string, gwc gateway.GatewayAPIClient, uid *user.UserId, gid *group.GroupId) ([]string, error) { + if uid != nil { + return append(users, uid.GetOpaqueId()), nil + } + us, err := utils.GetGroupMembers(ctx, gid.GetOpaqueId(), gwc) + return append(users, us...), err +} diff --git a/vendor/github.com/cs3org/reva/v2/internal/grpc/interceptors/eventsmiddleware/conversion.go b/vendor/github.com/cs3org/reva/v2/internal/grpc/interceptors/eventsmiddleware/conversion.go index edcab27e3..1a123f1ca 100644 --- a/vendor/github.com/cs3org/reva/v2/internal/grpc/interceptors/eventsmiddleware/conversion.go +++ b/vendor/github.com/cs3org/reva/v2/internal/grpc/interceptors/eventsmiddleware/conversion.go @@ -356,6 +356,18 @@ func SpaceShared(r *provider.AddGrantResponse, req *provider.AddGrantRequest, ex } } +// SpaceShareUpdated converts the response to an events +func SpaceShareUpdated(r *provider.UpdateGrantResponse, req *provider.UpdateGrantRequest, executant *user.UserId) events.SpaceShareUpdated { + id := storagespace.FormatStorageID(req.Ref.ResourceId.StorageId, req.Ref.ResourceId.SpaceId) + return events.SpaceShareUpdated{ + Executant: executant, + GranteeUserID: req.Grant.GetGrantee().GetUserId(), + GranteeGroupID: req.Grant.GetGrantee().GetGroupId(), + ID: &provider.StorageSpaceId{OpaqueId: id}, + Timestamp: time.Now(), + } +} + // SpaceUnshared converts the response to an event func SpaceUnshared(r *provider.RemoveGrantResponse, req *provider.RemoveGrantRequest, executant *user.UserId) events.SpaceUnshared { id := storagespace.FormatStorageID(req.Ref.ResourceId.StorageId, req.Ref.ResourceId.SpaceId) diff --git a/vendor/github.com/cs3org/reva/v2/internal/grpc/interceptors/eventsmiddleware/events.go b/vendor/github.com/cs3org/reva/v2/internal/grpc/interceptors/eventsmiddleware/events.go index 4735b3458..ca4086bfb 100644 --- a/vendor/github.com/cs3org/reva/v2/internal/grpc/interceptors/eventsmiddleware/events.go +++ b/vendor/github.com/cs3org/reva/v2/internal/grpc/interceptors/eventsmiddleware/events.go @@ -123,6 +123,11 @@ func NewUnary(m map[string]interface{}) (grpc.UnaryServerInterceptor, int, error if isSuccess(v) && utils.ExistsInOpaque(r.Opaque, "spacegrant") { ev = SpaceShared(v, r, executantID) } + case *provider.UpdateGrantResponse: + r := req.(*provider.UpdateGrantRequest) + if isSuccess(v) && utils.ExistsInOpaque(r.Opaque, "spacegrant") { + ev = SpaceShareUpdated(v, r, executantID) + } case *provider.RemoveGrantResponse: r := req.(*provider.RemoveGrantRequest) if isSuccess(v) && utils.ExistsInOpaque(r.Opaque, "spacegrant") { diff --git a/vendor/github.com/cs3org/reva/v2/internal/grpc/services/gateway/gateway.go b/vendor/github.com/cs3org/reva/v2/internal/grpc/services/gateway/gateway.go index 55a5ea8a7..d16bf42cb 100644 --- a/vendor/github.com/cs3org/reva/v2/internal/grpc/services/gateway/gateway.go +++ b/vendor/github.com/cs3org/reva/v2/internal/grpc/services/gateway/gateway.go @@ -131,7 +131,7 @@ func (c *config) init() { } if c.CreatePersonalSpaceCacheConfig.Store == "" { - c.CreatePersonalSpaceCacheConfig.Store = "noop" + c.CreatePersonalSpaceCacheConfig.Store = "memory" } if c.CreatePersonalSpaceCacheConfig.Database == "" { diff --git a/vendor/github.com/cs3org/reva/v2/internal/grpc/services/publicshareprovider/publicshareprovider.go b/vendor/github.com/cs3org/reva/v2/internal/grpc/services/publicshareprovider/publicshareprovider.go index b987b96ac..4879d7caf 100644 --- a/vendor/github.com/cs3org/reva/v2/internal/grpc/services/publicshareprovider/publicshareprovider.go +++ b/vendor/github.com/cs3org/reva/v2/internal/grpc/services/publicshareprovider/publicshareprovider.go @@ -554,12 +554,24 @@ func (s *service) UpdatePublicShare(ctx context.Context, req *link.UpdatePublicS } updatePassword := req.GetUpdate().GetType() == link.UpdatePublicShareRequest_Update_TYPE_PASSWORD setPassword := grant.GetPassword() + + // we update permissions with an empty password and password is not set on the public share + emptyPasswordInPermissionUpdate := len(setPassword) == 0 && updatePermissions && !ps.PasswordProtected + + // password is updated, we use the current permissions to check if the user can opt out if updatePassword && !isInternalLink && enforcePassword(canOptOut, ps.GetPermissions().GetPermissions(), s.conf) && len(setPassword) == 0 { return &link.UpdatePublicShareResponse{ Status: status.NewInvalidArg(ctx, "password protection is enforced"), }, nil } + // permissions are updated, we use the new permissions to check if the user can opt out + if emptyPasswordInPermissionUpdate && !isInternalLink && enforcePassword(canOptOut, grant.GetPermissions().GetPermissions(), s.conf) && len(setPassword) == 0 { + return &link.UpdatePublicShareResponse{ + Status: status.NewInvalidArg(ctx, "password protection is enforced"), + }, nil + } + // validate password policy if updatePassword && len(setPassword) > 0 { if err := s.passwordValidator.Validate(setPassword); err != nil { diff --git a/vendor/github.com/cs3org/reva/v2/internal/http/services/owncloud/ocs/handlers/apps/sharing/shares/shares.go b/vendor/github.com/cs3org/reva/v2/internal/http/services/owncloud/ocs/handlers/apps/sharing/shares/shares.go index 4a46ebaa1..ef88c9acb 100644 --- a/vendor/github.com/cs3org/reva/v2/internal/http/services/owncloud/ocs/handlers/apps/sharing/shares/shares.go +++ b/vendor/github.com/cs3org/reva/v2/internal/http/services/owncloud/ocs/handlers/apps/sharing/shares/shares.go @@ -1147,7 +1147,7 @@ func (h *Handler) listSharesWithOthers(w http.ResponseWriter, r *http.Request) { s := r.URL.Query().Get("space") spaceRef := r.URL.Query().Get("space_ref") ctx := r.Context() - sublog := appctx.GetLogger(ctx).With().Str("path", p).Str("space", s).Str("space_ref", spaceRef).Logger() + sublog := appctx.GetLogger(ctx).With().Str("path", p).Str("spaceid", s).Str("space_ref", spaceRef).Logger() if p != "" || s != "" || spaceRef != "" { ref, err := h.extractReference(r) if err != nil { diff --git a/vendor/github.com/cs3org/reva/v2/pkg/events/spaces.go b/vendor/github.com/cs3org/reva/v2/pkg/events/spaces.go index 03ab3ff0a..c24ac4828 100644 --- a/vendor/github.com/cs3org/reva/v2/pkg/events/spaces.go +++ b/vendor/github.com/cs3org/reva/v2/pkg/events/spaces.go @@ -125,6 +125,22 @@ func (SpaceShared) Unmarshal(v []byte) (interface{}, error) { return e, err } +// SpaceShareUpdated is emitted when a space share is updated +type SpaceShareUpdated struct { + Executant *user.UserId + GranteeUserID *user.UserId + GranteeGroupID *group.GroupId + ID *provider.StorageSpaceId + Timestamp time.Time +} + +// Unmarshal to fulfill umarshaller interface +func (SpaceShareUpdated) Unmarshal(v []byte) (interface{}, error) { + ev := SpaceShareUpdated{} + err := json.Unmarshal(v, &ev) + return ev, err +} + // SpaceUnshared is emitted when a space is unshared type SpaceUnshared struct { Executant *user.UserId diff --git a/vendor/github.com/cs3org/reva/v2/pkg/rhttp/datatx/manager/spaces/spaces.go b/vendor/github.com/cs3org/reva/v2/pkg/rhttp/datatx/manager/spaces/spaces.go index ce585ab54..1e06a2fed 100644 --- a/vendor/github.com/cs3org/reva/v2/pkg/rhttp/datatx/manager/spaces/spaces.go +++ b/vendor/github.com/cs3org/reva/v2/pkg/rhttp/datatx/manager/spaces/spaces.go @@ -80,7 +80,7 @@ func (m *manager) Handler(fs storage.FS) (http.Handler, error) { var spaceID string spaceID, r.URL.Path = router.ShiftPath(r.URL.Path) - sublog := appctx.GetLogger(ctx).With().Str("datatx", "spaces").Str("space", spaceID).Logger() + sublog := appctx.GetLogger(ctx).With().Str("datatx", "spaces").Str("spaceid", spaceID).Logger() switch r.Method { case "GET", "HEAD": diff --git a/vendor/github.com/cs3org/reva/v2/pkg/storage/fs/posix/lookup/lookup.go b/vendor/github.com/cs3org/reva/v2/pkg/storage/fs/posix/lookup/lookup.go index d64223f7d..d08d94fd5 100644 --- a/vendor/github.com/cs3org/reva/v2/pkg/storage/fs/posix/lookup/lookup.go +++ b/vendor/github.com/cs3org/reva/v2/pkg/storage/fs/posix/lookup/lookup.go @@ -199,7 +199,8 @@ func (lu *Lookup) Path(ctx context.Context, n *node.Node, hasPermission node.Per appctx.GetLogger(ctx). Error().Err(err). Str("path", p). - Interface("node", n). + Str("spaceid", n.SpaceID). + Str("nodeid", n.ID). Msg("Path()") return } diff --git a/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/decomposedfs.go b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/decomposedfs.go index a9754c074..9af86b4c4 100644 --- a/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/decomposedfs.go +++ b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/decomposedfs.go @@ -246,9 +246,10 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) { for event := range ch { switch ev := event.Event.(type) { case events.PostprocessingFinished: + sublog := log.With().Str("event", "PostprocessingFinished").Str("uploadid", ev.UploadID).Logger() session, err := fs.sessionStore.Get(ctx, ev.UploadID) if err != nil { - log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("Failed to get upload") + sublog.Error().Err(err).Msg("Failed to get upload") continue // NOTE: since we can't get the upload, we can't delete the blob } @@ -256,11 +257,12 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) { n, err := session.Node(ctx) if err != nil { - log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("could not read node") + sublog.Error().Err(err).Msg("could not read node") continue } + sublog = log.With().Str("spaceid", session.SpaceID()).Str("nodeid", session.NodeID()).Logger() if !n.Exists { - log.Debug().Str("uploadID", ev.UploadID).Str("nodeID", session.NodeID()).Msg("node no longer exists") + sublog.Debug().Msg("node no longer exists") fs.sessionStore.Cleanup(ctx, session, false, false, false) continue } @@ -274,7 +276,7 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) { switch ev.Outcome { default: - log.Error().Str("outcome", string(ev.Outcome)).Str("uploadID", ev.UploadID).Msg("unknown postprocessing outcome - aborting") + sublog.Error().Str("outcome", string(ev.Outcome)).Msg("unknown postprocessing outcome - aborting") fallthrough case events.PPOutcomeAbort: failed = true @@ -283,7 +285,7 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) { metrics.UploadSessionsAborted.Inc() case events.PPOutcomeContinue: if err := session.Finalize(); err != nil { - log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("could not finalize upload") + sublog.Error().Err(err).Msg("could not finalize upload") failed = true revertNodeMetadata = false keepUpload = true @@ -301,7 +303,7 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) { getParent := func() *node.Node { p, err := n.Parent(ctx) if err != nil { - log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("could not read parent") + sublog.Error().Err(err).Msg("could not read parent") return nil } return p @@ -309,15 +311,22 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) { now := time.Now() if failed { - // propagate sizeDiff after failed postprocessing - if err := fs.tp.Propagate(ctx, n, -session.SizeDiff()); err != nil { - log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("could not propagate tree size change") + // if no other upload session is in progress (processing id != session id) or has finished (processing id == "") + latestSession, err := n.ProcessingID(ctx) + if err != nil { + sublog.Error().Err(err).Msg("reading node for session failed") + } + if latestSession == session.ID() { + // propagate reverted sizeDiff after failed postprocessing + if err := fs.tp.Propagate(ctx, n, -session.SizeDiff()); err != nil { + sublog.Error().Err(err).Msg("could not propagate tree size change") + } } } else if p := getParent(); p != nil { // update parent tmtime to propagate etag change after successful postprocessing _ = p.SetTMTime(ctx, &now) if err := fs.tp.Propagate(ctx, p, 0); err != nil { - log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("could not propagate etag change") + sublog.Error().Err(err).Msg("could not propagate etag change") } } @@ -343,22 +352,24 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) { SpaceOwner: n.SpaceOwnerOrManager(ctx), }, ); err != nil { - log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("Failed to publish UploadReady event") + sublog.Error().Err(err).Msg("Failed to publish UploadReady event") } case events.RestartPostprocessing: + sublog := log.With().Str("event", "RestartPostprocessing").Str("uploadid", ev.UploadID).Logger() session, err := fs.sessionStore.Get(ctx, ev.UploadID) if err != nil { - log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("Failed to get upload") + sublog.Error().Err(err).Msg("Failed to get upload") continue } n, err := session.Node(ctx) if err != nil { - log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("could not read node") + sublog.Error().Err(err).Msg("could not read node") continue } + sublog = log.With().Str("spaceid", session.SpaceID()).Str("nodeid", session.NodeID()).Logger() s, err := session.URL(ctx) if err != nil { - log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("could not create url") + sublog.Error().Err(err).Msg("could not create url") continue } @@ -374,9 +385,10 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) { Filename: session.Filename(), Filesize: uint64(session.Size()), }); err != nil { - log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("Failed to publish BytesReceived event") + sublog.Error().Err(err).Msg("Failed to publish BytesReceived event") } case events.PostprocessingStepFinished: + sublog := log.With().Str("event", "PostprocessingStepFinished").Str("uploadid", ev.UploadID).Logger() if ev.FinishedStep != events.PPStepAntivirus { // atm we are only interested in antivirus results continue @@ -388,6 +400,7 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) { // Should we handle this here? continue } + sublog = log.With().Str("scan_description", res.Description).Bool("infected", res.Infected).Logger() var n *node.Node switch ev.UploadID { @@ -473,19 +486,20 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) { // uploadid is not empty -> this is an async upload session, err := fs.sessionStore.Get(ctx, ev.UploadID) if err != nil { - log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("Failed to get upload") + sublog.Error().Err(err).Msg("Failed to get upload") continue } n, err = session.Node(ctx) if err != nil { - log.Error().Err(err).Interface("uploadID", ev.UploadID).Msg("Failed to get node after scan") + sublog.Error().Err(err).Msg("Failed to get node after scan") continue } + sublog = log.With().Str("spaceid", session.SpaceID()).Str("nodeid", session.NodeID()).Logger() } if err := n.SetScanData(ctx, res.Description, res.Scandate); err != nil { - log.Error().Err(err).Str("uploadID", ev.UploadID).Interface("resourceID", res.ResourceID).Msg("Failed to set scan results") + sublog.Error().Err(err).Msg("Failed to set scan results") continue } diff --git a/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/lookup/lookup.go b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/lookup/lookup.go index bd432c5fd..e7a813983 100644 --- a/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/lookup/lookup.go +++ b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/lookup/lookup.go @@ -211,7 +211,8 @@ func (lu *Lookup) Path(ctx context.Context, n *node.Node, hasPermission node.Per appctx.GetLogger(ctx). Error().Err(err). Str("path", p). - Interface("node", n). + Str("spaceid", n.SpaceID). + Str("nodeid", n.ID). Msg("Path()") return } diff --git a/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata.go b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata.go index ec5fed27c..41a44a75f 100644 --- a/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata.go +++ b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata.go @@ -42,7 +42,7 @@ func (fs *Decomposedfs) SetArbitraryMetadata(ctx context.Context, ref *provider. if err != nil { return errors.Wrap(err, "Decomposedfs: error resolving ref") } - sublog := appctx.GetLogger(ctx).With().Interface("node", n).Logger() + sublog := appctx.GetLogger(ctx).With().Str("spaceid", n.SpaceID).Str("nodeid", n.ID).Logger() if !n.Exists { err = errtypes.NotFound(filepath.Join(n.ParentID, n.Name)) @@ -135,7 +135,7 @@ func (fs *Decomposedfs) UnsetArbitraryMetadata(ctx context.Context, ref *provide if err != nil { return errors.Wrap(err, "Decomposedfs: error resolving ref") } - sublog := appctx.GetLogger(ctx).With().Interface("node", n).Logger() + sublog := appctx.GetLogger(ctx).With().Str("spaceid", n.SpaceID).Str("nodeid", n.ID).Logger() if !n.Exists { err = errtypes.NotFound(filepath.Join(n.ParentID, n.Name)) diff --git a/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node/node.go b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node/node.go index f1db7f5eb..48432f224 100644 --- a/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node/node.go +++ b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node/node.go @@ -513,7 +513,7 @@ func (n *Node) readOwner(ctx context.Context) (*userpb.UserId, error) { func (n *Node) PermissionSet(ctx context.Context) (provider.ResourcePermissions, bool) { u, ok := ctxpkg.ContextGetUser(ctx) if !ok { - appctx.GetLogger(ctx).Debug().Interface("node", n).Msg("no user in context, returning default permissions") + appctx.GetLogger(ctx).Debug().Str("spaceid", n.SpaceID).Str("nodeid", n.ID).Msg("no user in context, returning default permissions") return NoPermissions(), false } if utils.UserEqual(u.Id, n.SpaceRoot.Owner()) { @@ -582,7 +582,7 @@ func (n *Node) SetMtime(ctx context.Context, t *time.Time) (err error) { // SetEtag sets the temporary etag of a node if it differs from the current etag func (n *Node) SetEtag(ctx context.Context, val string) (err error) { - sublog := appctx.GetLogger(ctx).With().Interface("node", n).Logger() + sublog := appctx.GetLogger(ctx).With().Str("spaceid", n.SpaceID).Str("nodeid", n.ID).Logger() var tmTime time.Time if tmTime, err = n.GetTMTime(ctx); err != nil { return @@ -635,7 +635,7 @@ func (n *Node) IsDir(ctx context.Context) bool { // AsResourceInfo return the node as CS3 ResourceInfo func (n *Node) AsResourceInfo(ctx context.Context, rp *provider.ResourcePermissions, mdKeys, fieldMask []string, returnBasename bool) (ri *provider.ResourceInfo, err error) { - sublog := appctx.GetLogger(ctx).With().Interface("node", n.ID).Logger() + sublog := appctx.GetLogger(ctx).With().Str("spaceid", n.SpaceID).Str("nodeid", n.ID).Logger() var fn string nodeType := n.Type(ctx) @@ -844,9 +844,9 @@ func (n *Node) readChecksumIntoResourceChecksum(ctx context.Context, algo string Sum: hex.EncodeToString(v), } case metadata.IsAttrUnset(err): - appctx.GetLogger(ctx).Debug().Err(err).Str("nodepath", n.InternalPath()).Str("algorithm", algo).Msg("checksum not set") + appctx.GetLogger(ctx).Debug().Str("spaceid", n.SpaceID).Str("nodeid", n.ID).Str("nodepath", n.InternalPath()).Str("algorithm", algo).Msg("checksum not set") default: - appctx.GetLogger(ctx).Error().Err(err).Str("nodepath", n.InternalPath()).Str("algorithm", algo).Msg("could not read checksum") + appctx.GetLogger(ctx).Error().Err(err).Str("spaceid", n.SpaceID).Str("nodeid", n.ID).Str("nodepath", n.InternalPath()).Str("algorithm", algo).Msg("could not read checksum") } } @@ -864,9 +864,9 @@ func (n *Node) readChecksumIntoOpaque(ctx context.Context, algo string, ri *prov Value: []byte(hex.EncodeToString(v)), } case metadata.IsAttrUnset(err): - appctx.GetLogger(ctx).Debug().Err(err).Str("nodepath", n.InternalPath()).Str("algorithm", algo).Msg("checksum not set") + appctx.GetLogger(ctx).Debug().Str("spaceid", n.SpaceID).Str("nodeid", n.ID).Str("nodepath", n.InternalPath()).Str("algorithm", algo).Msg("checksum not set") default: - appctx.GetLogger(ctx).Error().Err(err).Str("nodepath", n.InternalPath()).Str("algorithm", algo).Msg("could not read checksum") + appctx.GetLogger(ctx).Error().Err(err).Str("spaceid", n.SpaceID).Str("nodeid", n.ID).Str("nodepath", n.InternalPath()).Str("algorithm", algo).Msg("could not read checksum") } } @@ -891,12 +891,12 @@ func (n *Node) readQuotaIntoOpaque(ctx context.Context, ri *provider.ResourceInf Value: []byte(v), } } else { - appctx.GetLogger(ctx).Error().Err(err).Str("nodepath", n.InternalPath()).Str("quota", v).Msg("malformed quota") + appctx.GetLogger(ctx).Error().Err(err).Str("spaceid", n.SpaceID).Str("nodeid", n.ID).Str("nodepath", n.InternalPath()).Str("quota", v).Msg("malformed quota") } case metadata.IsAttrUnset(err): - appctx.GetLogger(ctx).Debug().Err(err).Str("nodepath", n.InternalPath()).Msg("quota not set") + appctx.GetLogger(ctx).Debug().Str("spaceid", n.SpaceID).Str("nodeid", n.ID).Str("nodepath", n.InternalPath()).Msg("quota not set") default: - appctx.GetLogger(ctx).Error().Err(err).Str("nodepath", n.InternalPath()).Msg("could not read quota") + appctx.GetLogger(ctx).Error().Err(err).Str("spaceid", n.SpaceID).Str("nodeid", n.ID).Str("nodepath", n.InternalPath()).Msg("could not read quota") } } @@ -1014,7 +1014,7 @@ func isGrantExpired(g *provider.Grant) bool { func (n *Node) ReadUserPermissions(ctx context.Context, u *userpb.User) (ap provider.ResourcePermissions, accessDenied bool, err error) { // check if the current user is the owner if utils.UserEqual(u.Id, n.Owner()) { - appctx.GetLogger(ctx).Debug().Str("node", n.ID).Msg("user is owner, returning owner permissions") + appctx.GetLogger(ctx).Debug().Str("spaceid", n.SpaceID).Str("nodeid", n.ID).Msg("user is owner, returning owner permissions") return OwnerPermissions(), false, nil } @@ -1032,7 +1032,7 @@ func (n *Node) ReadUserPermissions(ctx context.Context, u *userpb.User) (ap prov // we read all grantees from the node var grantees []string if grantees, err = n.ListGrantees(ctx); err != nil { - appctx.GetLogger(ctx).Error().Err(err).Interface("node", n).Msg("error listing grantees") + appctx.GetLogger(ctx).Error().Err(err).Str("spaceid", n.SpaceID).Str("nodeid", n.ID).Msg("error listing grantees") return NoPermissions(), true, err } @@ -1073,15 +1073,15 @@ func (n *Node) ReadUserPermissions(ctx context.Context, u *userpb.User) (ap prov } AddPermissions(&ap, g.GetPermissions()) case metadata.IsAttrUnset(err): - appctx.GetLogger(ctx).Error().Interface("node", n).Str("grant", grantees[i]).Interface("grantees", grantees).Msg("grant vanished from node after listing") + appctx.GetLogger(ctx).Error().Str("spaceid", n.SpaceID).Str("nodeid", n.ID).Str("grant", grantees[i]).Interface("grantees", grantees).Msg("grant vanished from node after listing") // continue with next segment default: - appctx.GetLogger(ctx).Error().Err(err).Interface("node", n).Str("grant", grantees[i]).Msg("error reading permissions") + appctx.GetLogger(ctx).Error().Err(err).Str("spaceid", n.SpaceID).Str("nodeid", n.ID).Str("grant", grantees[i]).Msg("error reading permissions") // continue with next segment } } - appctx.GetLogger(ctx).Debug().Interface("permissions", ap).Interface("node", n).Interface("user", u).Msg("returning aggregated permissions") + appctx.GetLogger(ctx).Debug().Interface("permissions", ap).Str("spaceid", n.SpaceID).Str("nodeid", n.ID).Interface("user", u).Msg("returning aggregated permissions") return ap, false, nil } @@ -1135,7 +1135,7 @@ func (n *Node) IsDenied(ctx context.Context) bool { func (n *Node) ListGrantees(ctx context.Context) (grantees []string, err error) { attrs, err := n.Xattrs(ctx) if err != nil { - appctx.GetLogger(ctx).Error().Err(err).Str("node", n.ID).Msg("error listing attributes") + appctx.GetLogger(ctx).Error().Err(err).Str("spaceid", n.SpaceID).Str("nodeid", n.ID).Msg("error listing attributes") return nil, err } for name := range attrs { @@ -1202,7 +1202,8 @@ func (n *Node) ListGrants(ctx context.Context) ([]*provider.Grant, error) { appctx.GetLogger(ctx). Error(). Err(err). - Str("node", n.ID). + Str("spaceid", n.SpaceID). + Str("nodeid", n.ID). Str("grantee", g). Msg("error reading grant") continue @@ -1276,6 +1277,12 @@ func (n *Node) IsProcessing(ctx context.Context) bool { return err == nil && strings.HasPrefix(v, ProcessingStatus) } +// ProcessingID returns the latest upload session id +func (n *Node) ProcessingID(ctx context.Context) (string, error) { + v, err := n.XattrString(ctx, prefixes.StatusPrefix) + return strings.TrimPrefix(v, ProcessingStatus), err +} + // IsSpaceRoot checks if the node is a space root func (n *Node) IsSpaceRoot(ctx context.Context) bool { _, err := n.Xattr(ctx, prefixes.SpaceNameAttr) diff --git a/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node/permissions.go b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node/permissions.go index 4e3459a94..c8367ef85 100644 --- a/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node/permissions.go +++ b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node/permissions.go @@ -168,7 +168,7 @@ func (p *Permissions) assemblePermissions(ctx context.Context, n *Node, failOnTr } AddPermissions(&ap, &np) } else { - appctx.GetLogger(ctx).Error().Err(err).Interface("node", cn.ID).Msg("error reading permissions") + appctx.GetLogger(ctx).Error().Err(err).Str("spaceid", cn.SpaceID).Str("nodeid", cn.ID).Msg("error reading permissions") // continue with next segment } @@ -194,7 +194,7 @@ func (p *Permissions) assemblePermissions(ctx context.Context, n *Node, failOnTr } AddPermissions(&ap, &np) } else { - appctx.GetLogger(ctx).Error().Err(err).Interface("node", cn.ID).Msg("error reading root node permissions") + appctx.GetLogger(ctx).Error().Err(err).Str("spaceid", cn.SpaceID).Str("nodeid", cn.ID).Msg("error reading root node permissions") } // check if the current user is the owner @@ -202,7 +202,7 @@ func (p *Permissions) assemblePermissions(ctx context.Context, n *Node, failOnTr return OwnerPermissions(), nil } - appctx.GetLogger(ctx).Debug().Interface("permissions", ap).Interface("node", n.ID).Interface("user", u).Msg("returning agregated permissions") + appctx.GetLogger(ctx).Debug().Interface("permissions", ap).Str("spaceid", n.SpaceID).Str("nodeid", n.ID).Interface("user", u).Msg("returning agregated permissions") return ap, nil } diff --git a/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/recycle.go b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/recycle.go index 72c2b2264..aa0ead067 100644 --- a/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/recycle.go +++ b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/recycle.go @@ -57,7 +57,7 @@ func (fs *Decomposedfs) ListRecycle(ctx context.Context, ref *provider.Reference } spaceID := ref.ResourceId.OpaqueId - sublog := appctx.GetLogger(ctx).With().Str("space", spaceID).Str("key", key).Str("relative_path", relativePath).Logger() + sublog := appctx.GetLogger(ctx).With().Str("spaceid", spaceID).Str("key", key).Str("relative_path", relativePath).Logger() // check permissions trashnode, err := fs.lu.NodeFromSpaceID(ctx, spaceID) @@ -98,7 +98,7 @@ func (fs *Decomposedfs) ListRecycle(ctx context.Context, ref *provider.Reference if attrBytes, ok := attrs[prefixes.TrashOriginAttr]; ok { origin = string(attrBytes) } else { - sublog.Error().Err(err).Str("space", spaceID).Msg("could not read origin path, skipping") + sublog.Error().Err(err).Str("spaceid", spaceID).Msg("could not read origin path, skipping") return nil, err } @@ -291,14 +291,14 @@ func (fs *Decomposedfs) listTrashRoot(ctx context.Context, spaceID string) ([]*p // TODO nanos } } else { - log.Error().Err(err).Str("trashRoot", trashRoot).Str("item", itemPath).Str("node", nodeID).Str("dtime", timeSuffix).Msg("could not parse time format, ignoring") + log.Error().Err(err).Str("trashRoot", trashRoot).Str("item", itemPath).Str("spaceid", spaceID).Str("nodeid", nodeID).Str("dtime", timeSuffix).Msg("could not parse time format, ignoring") } // lookup origin path in extended attributes if attr, ok := attrs[prefixes.TrashOriginAttr]; ok { item.Ref = &provider.Reference{Path: string(attr)} } else { - log.Error().Str("trashRoot", trashRoot).Str("item", itemPath).Str("node", nodeID).Str("dtime", timeSuffix).Msg("could not read origin path") + log.Error().Str("trashRoot", trashRoot).Str("item", itemPath).Str("spaceid", spaceID).Str("nodeid", nodeID).Str("dtime", timeSuffix).Msg("could not read origin path") } select { case results <- item: diff --git a/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/spaces.go b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/spaces.go index 0c1f541f8..6a555ea12 100644 --- a/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/spaces.go +++ b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/spaces.go @@ -808,7 +808,7 @@ func (fs *Decomposedfs) storageSpaceFromNode(ctx context.Context, n *node.Node, } } - sublog := appctx.GetLogger(ctx).With().Str("space", n.SpaceRoot.ID).Logger() + sublog := appctx.GetLogger(ctx).With().Str("spaceid", n.SpaceID).Logger() var err error // TODO apply more filters diff --git a/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/upload.go b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/upload.go index 02f74ce26..03c757b98 100644 --- a/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/upload.go +++ b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/upload.go @@ -223,7 +223,7 @@ func (fs *Decomposedfs) InitiateUpload(ctx context.Context, ref *provider.Refere } } - log.Debug().Interface("session", session).Interface("node", n).Interface("metadata", metadata).Msg("Decomposedfs: resolved filename") + log.Debug().Str("uploadid", session.ID()).Str("spaceid", n.SpaceID).Str("nodeid", n.ID).Interface("metadata", metadata).Msg("Decomposedfs: resolved filename") _, err = node.CheckQuota(ctx, n.SpaceRoot, n.Exists, uint64(n.Blobsize), uint64(session.Size())) if err != nil { diff --git a/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/upload/store.go b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/upload/store.go index 57a7aea8f..b880b39ba 100644 --- a/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/upload/store.go +++ b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/upload/store.go @@ -205,15 +205,23 @@ func (store OcisStore) CreateNodeForUpload(session *OcisSession, initAttrs node. } var f *lockedfile.File - if session.NodeExists() { + if session.NodeExists() { // TODO this is wrong. The node should be created when the upload starts, the revisions should be created independently of the node + // we do not need to propagate a change when a node is created, only when the upload is ready. + // that still creates problems for desktop clients because if another change causes propagation it will detects an empty file + // so the first upload has to point to the first revision with the expected size. The file cannot be downloaded, but it can be overwritten (which will create a new revision and make the node reflect the latest revision) + // any finished postprocessing will not affect the node metadata. + // *thinking* but then initializing an upload will lock the file until the upload has finished. That sucks. + // so we have to check if the node has been created meanwhile (well, only in case the upload does not know the nodeid ... or the NodeExists array that is checked by session.NodeExists()) + // FIXME look at the disk again to see if the file has been created in between, or just try initializing a new node and do the update existing node as a fallback. <- the latter! + f, err = store.updateExistingNode(ctx, session, n, session.SpaceID(), uint64(session.Size())) if f != nil { - appctx.GetLogger(ctx).Info().Str("lockfile", f.Name()).Interface("err", err).Msg("got lock file from updateExistingNode") + appctx.GetLogger(ctx).Debug().Str("lockfile", f.Name()).Interface("err", err).Msg("got lock file from updateExistingNode") } } else { f, err = store.initNewNode(ctx, session, n, uint64(session.Size())) if f != nil { - appctx.GetLogger(ctx).Info().Str("lockfile", f.Name()).Interface("err", err).Msg("got lock file from initNewNode") + appctx.GetLogger(ctx).Debug().Str("lockfile", f.Name()).Interface("err", err).Msg("got lock file from initNewNode") } } defer func() { diff --git a/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/upload/upload.go b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/upload/upload.go index 9244052a1..fe2861e55 100644 --- a/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/upload/upload.go +++ b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/upload/upload.go @@ -278,14 +278,12 @@ func (session *OcisSession) ConcatUploads(_ context.Context, uploads []tusd.Uplo func (session *OcisSession) Finalize() (err error) { ctx, span := tracer.Start(session.Context(context.Background()), "Finalize") defer span.End() - n, err := session.Node(ctx) - if err != nil { - return err - } + + revisionNode := &node.Node{SpaceID: session.SpaceID(), BlobID: session.ID(), Blobsize: session.Size()} // upload the data to the blobstore _, subspan := tracer.Start(ctx, "WriteBlob") - err = session.store.tp.WriteBlob(n, session.binPath()) + err = session.store.tp.WriteBlob(revisionNode, session.binPath()) subspan.End() if err != nil { return errors.Wrap(err, "failed to upload file to blobstore") @@ -318,12 +316,12 @@ func (session *OcisSession) Cleanup(revertNodeMetadata, cleanBin, cleanInfo bool ctx := session.Context(context.Background()) if revertNodeMetadata { + n, err := session.Node(ctx) + if err != nil { + appctx.GetLogger(ctx).Error().Err(err).Str("spaceid", n.SpaceID).Str("nodeid", n.ID).Str("uploadid", session.ID()).Msg("reading node for session failed") + } if session.NodeExists() { p := session.info.MetaData["versionsPath"] - n, err := session.Node(ctx) - if err != nil { - appctx.GetLogger(ctx).Error().Err(err).Str("sessionid", session.ID()).Msg("reading node for session failed") - } if err := session.store.lu.CopyMetadata(ctx, p, n.InternalPath(), func(attributeName string, value []byte) (newValue []byte, copy bool) { return value, strings.HasPrefix(attributeName, prefixes.ChecksumPrefix) || attributeName == prefixes.TypeAttr || @@ -339,7 +337,16 @@ func (session *OcisSession) Cleanup(revertNodeMetadata, cleanBin, cleanInfo bool } } else { - session.removeNode(ctx) + // if no other upload session is in progress (processing id != session id) or has finished (processing id == "") + latestSession, err := n.ProcessingID(ctx) + if err != nil { + appctx.GetLogger(ctx).Error().Err(err).Str("spaceid", n.SpaceID).Str("nodeid", n.ID).Str("uploadid", session.ID()).Msg("reading processingid for session failed") + } + if latestSession == session.ID() { + // actually delete the node + session.removeNode(ctx) + } + // FIXME else if the upload has become a revision, delete the revision, or if it is the last one, delete the node } } diff --git a/vendor/modules.txt b/vendor/modules.txt index d896e9457..7046c0481 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -366,7 +366,7 @@ github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1 github.com/cs3org/go-cs3apis/cs3/storage/registry/v1beta1 github.com/cs3org/go-cs3apis/cs3/tx/v1beta1 github.com/cs3org/go-cs3apis/cs3/types/v1beta1 -# github.com/cs3org/reva/v2 v2.19.2-0.20240405190914-ef59ba20ef0e +# github.com/cs3org/reva/v2 v2.19.2-0.20240415154646-5c2c9831a2b7 ## explicit; go 1.21 github.com/cs3org/reva/v2/cmd/revad/internal/grace github.com/cs3org/reva/v2/cmd/revad/runtime