From 6aa71d74d24e4cdff143ccfc945686ef3a811537 Mon Sep 17 00:00:00 2001 From: jkoberg Date: Mon, 15 Apr 2024 14:55:53 +0200 Subject: [PATCH 1/5] feat(clientlog): item-moved sse Signed-off-by: jkoberg --- services/clientlog/pkg/service/service.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/services/clientlog/pkg/service/service.go b/services/clientlog/pkg/service/service.go index a07fe64022..717104e75e 100644 --- a/services/clientlog/pkg/service/service.go +++ b/services/clientlog/pkg/service/service.go @@ -148,11 +148,12 @@ func (cl *ClientlogService) processEvent(event events.Event) { case events.ContainerCreated: p("folder-created", e.Ref) case events.ItemMoved: - // we are only interested in the rename case - if !utils.ResourceIDEqual(e.OldReference.GetResourceId(), e.Ref.GetResourceId()) || e.Ref.GetPath() == e.OldReference.GetPath() { - return + // we send a dedicated event in case the item was only renamed + if utils.ResourceIDEqual(e.OldReference.GetResourceId(), e.Ref.GetResourceId()) || e.Ref.GetPath() == e.OldReference.GetPath() { + p("item-renamed", e.Ref) + } else { + p("item-moved", e.Ref) } - p("item-renamed", e.Ref) case events.FileLocked: p("file-locked", e.Ref) case events.FileUnlocked: From 370522e7cfa219e81bab9cd2e7554ea6e427c354 Mon Sep 17 00:00:00 2001 From: jkoberg Date: Mon, 15 Apr 2024 15:48:20 +0200 Subject: [PATCH 2/5] feat(clientlog): space/share created/removed sse Signed-off-by: jkoberg --- changelog/unreleased/sharing-sse.md | 5 ++++ services/clientlog/pkg/command/server.go | 4 ++++ services/clientlog/pkg/service/service.go | 29 +++++++++++++++++++---- 3 files changed, 33 insertions(+), 5 deletions(-) create mode 100644 changelog/unreleased/sharing-sse.md diff --git a/changelog/unreleased/sharing-sse.md b/changelog/unreleased/sharing-sse.md new file mode 100644 index 0000000000..3a3a2d7d86 --- /dev/null +++ b/changelog/unreleased/sharing-sse.md @@ -0,0 +1,5 @@ +Enhancement: Sharing SSEs + +Added server side events for item moved, share created/removed, space membership created/removed, share/space membership updated. + +https://github.com/owncloud/ocis/pull/8854 diff --git a/services/clientlog/pkg/command/server.go b/services/clientlog/pkg/command/server.go index f3cab7adae..0fcbf60959 100644 --- a/services/clientlog/pkg/command/server.go +++ b/services/clientlog/pkg/command/server.go @@ -33,6 +33,10 @@ var _registeredEvents = []events.Unmarshaller{ events.FileLocked{}, events.FileUnlocked{}, events.FileTouched{}, + events.SpaceShared{}, + events.SpaceUnshared{}, + events.ShareCreated{}, + events.ShareRemoved{}, } // Server is the entrypoint for the server command. diff --git a/services/clientlog/pkg/service/service.go b/services/clientlog/pkg/service/service.go index 717104e75e..47fe6df688 100644 --- a/services/clientlog/pkg/service/service.go +++ b/services/clientlog/pkg/service/service.go @@ -8,6 +8,8 @@ import ( "reflect" gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1" + group "github.com/cs3org/go-cs3apis/cs3/identity/group/v1beta1" + user "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1" provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" "go.opentelemetry.io/otel/trace" @@ -90,11 +92,6 @@ func (cl *ClientlogService) processEvent(event events.Event) { return } - gwc, err = cl.gatewaySelector.Next() - if err != nil { - cl.log.Error().Err(err).Interface("event", event).Msg("error getting gateway client") - return - } var ( users []string evType string @@ -160,6 +157,19 @@ func (cl *ClientlogService) processEvent(event events.Event) { p("file-unlocked", e.Ref) case events.FileTouched: p("file-touched", e.Ref) + case events.SpaceShared: + r, _ := storagespace.ParseReference(e.ID.GetOpaqueId()) + p("space-member-added", &r) + case events.SpaceUnshared: + r, _ := storagespace.ParseReference(e.ID.GetOpaqueId()) + p("space-member-removed", &r) + users, err = addSharees(ctx, users, gwc, e.GranteeUserID, e.GranteeGroupID) + case events.ShareCreated: + p("share-created", &provider.Reference{ResourceId: e.ItemID}) + users, err = addSharees(ctx, users, gwc, e.GranteeUserID, e.GranteeGroupID) + case events.ShareRemoved: + p("share-removed", &provider.Reference{ResourceId: e.ItemID}) + users, err = addSharees(ctx, users, gwc, e.GranteeUserID, e.GranteeGroupID) } if err != nil { @@ -204,3 +214,12 @@ func processFileEvent(ctx context.Context, ref *provider.Reference, gwc gateway. users, err := utils.GetSpaceMembers(ctx, info.GetSpace().GetId().GetOpaqueId(), gwc, utils.ViewerRole) return users, data, err } + +// adds userid to users slice or gets members of groupid and adds them to users slice +func addSharees(ctx context.Context, users []string, gwc gateway.GatewayAPIClient, uid *user.UserId, gid *group.GroupId) ([]string, error) { + if uid != nil { + return append(users, uid.GetOpaqueId()), nil + } + us, err := utils.GetGroupMembers(ctx, gid.GetOpaqueId(), gwc) + return append(users, us...), err +} From b90c9d8af08320b2b8e9917446610f420266b79c Mon Sep 17 00:00:00 2001 From: jkoberg Date: Mon, 15 Apr 2024 16:30:20 +0200 Subject: [PATCH 3/5] feat(clientlog): share-updated sse Signed-off-by: jkoberg --- changelog/unreleased/sharing-sse.md | 2 +- services/clientlog/pkg/command/server.go | 1 + services/clientlog/pkg/service/service.go | 3 +++ 3 files changed, 5 insertions(+), 1 deletion(-) diff --git a/changelog/unreleased/sharing-sse.md b/changelog/unreleased/sharing-sse.md index 3a3a2d7d86..dfc859c88c 100644 --- a/changelog/unreleased/sharing-sse.md +++ b/changelog/unreleased/sharing-sse.md @@ -1,5 +1,5 @@ Enhancement: Sharing SSEs -Added server side events for item moved, share created/removed, space membership created/removed, share/space membership updated. +Added server side events for item moved, share created/updated/removed, space membership created/removed. https://github.com/owncloud/ocis/pull/8854 diff --git a/services/clientlog/pkg/command/server.go b/services/clientlog/pkg/command/server.go index 0fcbf60959..044a66e8b2 100644 --- a/services/clientlog/pkg/command/server.go +++ b/services/clientlog/pkg/command/server.go @@ -37,6 +37,7 @@ var _registeredEvents = []events.Unmarshaller{ events.SpaceUnshared{}, events.ShareCreated{}, events.ShareRemoved{}, + events.ShareUpdated{}, } // Server is the entrypoint for the server command. diff --git a/services/clientlog/pkg/service/service.go b/services/clientlog/pkg/service/service.go index 47fe6df688..a194d6af0d 100644 --- a/services/clientlog/pkg/service/service.go +++ b/services/clientlog/pkg/service/service.go @@ -167,6 +167,9 @@ func (cl *ClientlogService) processEvent(event events.Event) { case events.ShareCreated: p("share-created", &provider.Reference{ResourceId: e.ItemID}) users, err = addSharees(ctx, users, gwc, e.GranteeUserID, e.GranteeGroupID) + case events.ShareUpdated: + p("share-updated", &provider.Reference{ResourceId: e.ItemID}) + users, err = addSharees(ctx, users, gwc, e.GranteeUserID, e.GranteeGroupID) case events.ShareRemoved: p("share-removed", &provider.Reference{ResourceId: e.ItemID}) users, err = addSharees(ctx, users, gwc, e.GranteeUserID, e.GranteeGroupID) From ea20298046690112f47a9e3e4e0da10b5e661d2f Mon Sep 17 00:00:00 2001 From: jkoberg Date: Mon, 15 Apr 2024 17:48:45 +0200 Subject: [PATCH 4/5] feat(reva): bump reva Signed-off-by: jkoberg --- go.mod | 2 +- go.sum | 4 +- .../eventsmiddleware/conversion.go | 12 +++++ .../interceptors/eventsmiddleware/events.go | 5 ++ .../internal/grpc/services/gateway/gateway.go | 2 +- .../publicshareprovider.go | 12 +++++ .../handlers/apps/sharing/shares/shares.go | 2 +- .../cs3org/reva/v2/pkg/events/spaces.go | 16 ++++++ .../pkg/rhttp/datatx/manager/spaces/spaces.go | 2 +- .../v2/pkg/storage/fs/posix/lookup/lookup.go | 3 +- .../utils/decomposedfs/decomposedfs.go | 50 ++++++++++++------- .../utils/decomposedfs/lookup/lookup.go | 3 +- .../storage/utils/decomposedfs/metadata.go | 4 +- .../storage/utils/decomposedfs/node/node.go | 41 ++++++++------- .../utils/decomposedfs/node/permissions.go | 6 +-- .../pkg/storage/utils/decomposedfs/recycle.go | 8 +-- .../pkg/storage/utils/decomposedfs/spaces.go | 2 +- .../pkg/storage/utils/decomposedfs/upload.go | 2 +- .../utils/decomposedfs/upload/store.go | 14 ++++-- .../utils/decomposedfs/upload/upload.go | 27 ++++++---- vendor/modules.txt | 2 +- 21 files changed, 151 insertions(+), 68 deletions(-) diff --git a/go.mod b/go.mod index 57331e69e2..4091b81b99 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,7 @@ require ( github.com/cenkalti/backoff v2.2.1+incompatible github.com/coreos/go-oidc/v3 v3.10.0 github.com/cs3org/go-cs3apis v0.0.0-20231023073225-7748710e0781 - github.com/cs3org/reva/v2 v2.19.2-0.20240405190914-ef59ba20ef0e + github.com/cs3org/reva/v2 v2.19.2-0.20240415154646-5c2c9831a2b7 github.com/dhowden/tag v0.0.0-20230630033851-978a0926ee25 github.com/disintegration/imaging v1.6.2 github.com/dutchcoders/go-clamd v0.0.0-20170520113014-b970184f4d9e diff --git a/go.sum b/go.sum index 6de6fa1c27..aa4692d1a5 100644 --- a/go.sum +++ b/go.sum @@ -1022,8 +1022,8 @@ github.com/crewjam/saml v0.4.14 h1:g9FBNx62osKusnFzs3QTN5L9CVA/Egfgm+stJShzw/c= github.com/crewjam/saml v0.4.14/go.mod h1:UVSZCf18jJkk6GpWNVqcyQJMD5HsRugBPf4I1nl2mME= github.com/cs3org/go-cs3apis v0.0.0-20231023073225-7748710e0781 h1:BUdwkIlf8IS2FasrrPg8gGPHQPOrQ18MS1Oew2tmGtY= github.com/cs3org/go-cs3apis v0.0.0-20231023073225-7748710e0781/go.mod h1:UXha4TguuB52H14EMoSsCqDj7k8a/t7g4gVP+bgY5LY= -github.com/cs3org/reva/v2 v2.19.2-0.20240405190914-ef59ba20ef0e h1:RAmS/42ZYRrkhvVLWgvJhwjna9zSAqa2DJ8xS7R3Rx0= -github.com/cs3org/reva/v2 v2.19.2-0.20240405190914-ef59ba20ef0e/go.mod h1:GRUrOp5HbFVwZTgR9bVrMZ/MvVy+Jhxw1PdMmhhKP9E= +github.com/cs3org/reva/v2 v2.19.2-0.20240415154646-5c2c9831a2b7 h1:7oBqhyPUyWSRNMKG5wLYDm9TSc0gfKj+LeVaUhjm+hQ= +github.com/cs3org/reva/v2 v2.19.2-0.20240415154646-5c2c9831a2b7/go.mod h1:GRUrOp5HbFVwZTgR9bVrMZ/MvVy+Jhxw1PdMmhhKP9E= github.com/cyberdelia/templates v0.0.0-20141128023046-ca7fffd4298c/go.mod h1:GyV+0YP4qX0UQ7r2MoYZ+AvYDp12OF5yg4q8rGnyNh4= github.com/cyphar/filepath-securejoin v0.2.4 h1:Ugdm7cg7i6ZK6x3xDF1oEu1nfkyfH53EtKeQYTC3kyg= github.com/cyphar/filepath-securejoin v0.2.4/go.mod h1:aPGpWjXOXUn2NCNjFvBE6aRxGGx79pTxQpKOJNYHHl4= diff --git a/vendor/github.com/cs3org/reva/v2/internal/grpc/interceptors/eventsmiddleware/conversion.go b/vendor/github.com/cs3org/reva/v2/internal/grpc/interceptors/eventsmiddleware/conversion.go index edcab27e37..1a123f1ca9 100644 --- a/vendor/github.com/cs3org/reva/v2/internal/grpc/interceptors/eventsmiddleware/conversion.go +++ b/vendor/github.com/cs3org/reva/v2/internal/grpc/interceptors/eventsmiddleware/conversion.go @@ -356,6 +356,18 @@ func SpaceShared(r *provider.AddGrantResponse, req *provider.AddGrantRequest, ex } } +// SpaceShareUpdated converts the response to an events +func SpaceShareUpdated(r *provider.UpdateGrantResponse, req *provider.UpdateGrantRequest, executant *user.UserId) events.SpaceShareUpdated { + id := storagespace.FormatStorageID(req.Ref.ResourceId.StorageId, req.Ref.ResourceId.SpaceId) + return events.SpaceShareUpdated{ + Executant: executant, + GranteeUserID: req.Grant.GetGrantee().GetUserId(), + GranteeGroupID: req.Grant.GetGrantee().GetGroupId(), + ID: &provider.StorageSpaceId{OpaqueId: id}, + Timestamp: time.Now(), + } +} + // SpaceUnshared converts the response to an event func SpaceUnshared(r *provider.RemoveGrantResponse, req *provider.RemoveGrantRequest, executant *user.UserId) events.SpaceUnshared { id := storagespace.FormatStorageID(req.Ref.ResourceId.StorageId, req.Ref.ResourceId.SpaceId) diff --git a/vendor/github.com/cs3org/reva/v2/internal/grpc/interceptors/eventsmiddleware/events.go b/vendor/github.com/cs3org/reva/v2/internal/grpc/interceptors/eventsmiddleware/events.go index 4735b34589..ca4086bfb2 100644 --- a/vendor/github.com/cs3org/reva/v2/internal/grpc/interceptors/eventsmiddleware/events.go +++ b/vendor/github.com/cs3org/reva/v2/internal/grpc/interceptors/eventsmiddleware/events.go @@ -123,6 +123,11 @@ func NewUnary(m map[string]interface{}) (grpc.UnaryServerInterceptor, int, error if isSuccess(v) && utils.ExistsInOpaque(r.Opaque, "spacegrant") { ev = SpaceShared(v, r, executantID) } + case *provider.UpdateGrantResponse: + r := req.(*provider.UpdateGrantRequest) + if isSuccess(v) && utils.ExistsInOpaque(r.Opaque, "spacegrant") { + ev = SpaceShareUpdated(v, r, executantID) + } case *provider.RemoveGrantResponse: r := req.(*provider.RemoveGrantRequest) if isSuccess(v) && utils.ExistsInOpaque(r.Opaque, "spacegrant") { diff --git a/vendor/github.com/cs3org/reva/v2/internal/grpc/services/gateway/gateway.go b/vendor/github.com/cs3org/reva/v2/internal/grpc/services/gateway/gateway.go index 55a5ea8a7a..d16bf42cb2 100644 --- a/vendor/github.com/cs3org/reva/v2/internal/grpc/services/gateway/gateway.go +++ b/vendor/github.com/cs3org/reva/v2/internal/grpc/services/gateway/gateway.go @@ -131,7 +131,7 @@ func (c *config) init() { } if c.CreatePersonalSpaceCacheConfig.Store == "" { - c.CreatePersonalSpaceCacheConfig.Store = "noop" + c.CreatePersonalSpaceCacheConfig.Store = "memory" } if c.CreatePersonalSpaceCacheConfig.Database == "" { diff --git a/vendor/github.com/cs3org/reva/v2/internal/grpc/services/publicshareprovider/publicshareprovider.go b/vendor/github.com/cs3org/reva/v2/internal/grpc/services/publicshareprovider/publicshareprovider.go index b987b96acf..4879d7caf4 100644 --- a/vendor/github.com/cs3org/reva/v2/internal/grpc/services/publicshareprovider/publicshareprovider.go +++ b/vendor/github.com/cs3org/reva/v2/internal/grpc/services/publicshareprovider/publicshareprovider.go @@ -554,12 +554,24 @@ func (s *service) UpdatePublicShare(ctx context.Context, req *link.UpdatePublicS } updatePassword := req.GetUpdate().GetType() == link.UpdatePublicShareRequest_Update_TYPE_PASSWORD setPassword := grant.GetPassword() + + // we update permissions with an empty password and password is not set on the public share + emptyPasswordInPermissionUpdate := len(setPassword) == 0 && updatePermissions && !ps.PasswordProtected + + // password is updated, we use the current permissions to check if the user can opt out if updatePassword && !isInternalLink && enforcePassword(canOptOut, ps.GetPermissions().GetPermissions(), s.conf) && len(setPassword) == 0 { return &link.UpdatePublicShareResponse{ Status: status.NewInvalidArg(ctx, "password protection is enforced"), }, nil } + // permissions are updated, we use the new permissions to check if the user can opt out + if emptyPasswordInPermissionUpdate && !isInternalLink && enforcePassword(canOptOut, grant.GetPermissions().GetPermissions(), s.conf) && len(setPassword) == 0 { + return &link.UpdatePublicShareResponse{ + Status: status.NewInvalidArg(ctx, "password protection is enforced"), + }, nil + } + // validate password policy if updatePassword && len(setPassword) > 0 { if err := s.passwordValidator.Validate(setPassword); err != nil { diff --git a/vendor/github.com/cs3org/reva/v2/internal/http/services/owncloud/ocs/handlers/apps/sharing/shares/shares.go b/vendor/github.com/cs3org/reva/v2/internal/http/services/owncloud/ocs/handlers/apps/sharing/shares/shares.go index 4a46ebaa13..ef88c9acb8 100644 --- a/vendor/github.com/cs3org/reva/v2/internal/http/services/owncloud/ocs/handlers/apps/sharing/shares/shares.go +++ b/vendor/github.com/cs3org/reva/v2/internal/http/services/owncloud/ocs/handlers/apps/sharing/shares/shares.go @@ -1147,7 +1147,7 @@ func (h *Handler) listSharesWithOthers(w http.ResponseWriter, r *http.Request) { s := r.URL.Query().Get("space") spaceRef := r.URL.Query().Get("space_ref") ctx := r.Context() - sublog := appctx.GetLogger(ctx).With().Str("path", p).Str("space", s).Str("space_ref", spaceRef).Logger() + sublog := appctx.GetLogger(ctx).With().Str("path", p).Str("spaceid", s).Str("space_ref", spaceRef).Logger() if p != "" || s != "" || spaceRef != "" { ref, err := h.extractReference(r) if err != nil { diff --git a/vendor/github.com/cs3org/reva/v2/pkg/events/spaces.go b/vendor/github.com/cs3org/reva/v2/pkg/events/spaces.go index 03ab3ff0a6..c24ac48282 100644 --- a/vendor/github.com/cs3org/reva/v2/pkg/events/spaces.go +++ b/vendor/github.com/cs3org/reva/v2/pkg/events/spaces.go @@ -125,6 +125,22 @@ func (SpaceShared) Unmarshal(v []byte) (interface{}, error) { return e, err } +// SpaceShareUpdated is emitted when a space share is updated +type SpaceShareUpdated struct { + Executant *user.UserId + GranteeUserID *user.UserId + GranteeGroupID *group.GroupId + ID *provider.StorageSpaceId + Timestamp time.Time +} + +// Unmarshal to fulfill umarshaller interface +func (SpaceShareUpdated) Unmarshal(v []byte) (interface{}, error) { + ev := SpaceShareUpdated{} + err := json.Unmarshal(v, &ev) + return ev, err +} + // SpaceUnshared is emitted when a space is unshared type SpaceUnshared struct { Executant *user.UserId diff --git a/vendor/github.com/cs3org/reva/v2/pkg/rhttp/datatx/manager/spaces/spaces.go b/vendor/github.com/cs3org/reva/v2/pkg/rhttp/datatx/manager/spaces/spaces.go index ce585ab542..1e06a2fedc 100644 --- a/vendor/github.com/cs3org/reva/v2/pkg/rhttp/datatx/manager/spaces/spaces.go +++ b/vendor/github.com/cs3org/reva/v2/pkg/rhttp/datatx/manager/spaces/spaces.go @@ -80,7 +80,7 @@ func (m *manager) Handler(fs storage.FS) (http.Handler, error) { var spaceID string spaceID, r.URL.Path = router.ShiftPath(r.URL.Path) - sublog := appctx.GetLogger(ctx).With().Str("datatx", "spaces").Str("space", spaceID).Logger() + sublog := appctx.GetLogger(ctx).With().Str("datatx", "spaces").Str("spaceid", spaceID).Logger() switch r.Method { case "GET", "HEAD": diff --git a/vendor/github.com/cs3org/reva/v2/pkg/storage/fs/posix/lookup/lookup.go b/vendor/github.com/cs3org/reva/v2/pkg/storage/fs/posix/lookup/lookup.go index d64223f7d4..d08d94fd5f 100644 --- a/vendor/github.com/cs3org/reva/v2/pkg/storage/fs/posix/lookup/lookup.go +++ b/vendor/github.com/cs3org/reva/v2/pkg/storage/fs/posix/lookup/lookup.go @@ -199,7 +199,8 @@ func (lu *Lookup) Path(ctx context.Context, n *node.Node, hasPermission node.Per appctx.GetLogger(ctx). Error().Err(err). Str("path", p). - Interface("node", n). + Str("spaceid", n.SpaceID). + Str("nodeid", n.ID). Msg("Path()") return } diff --git a/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/decomposedfs.go b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/decomposedfs.go index a9754c0743..9af86b4c43 100644 --- a/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/decomposedfs.go +++ b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/decomposedfs.go @@ -246,9 +246,10 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) { for event := range ch { switch ev := event.Event.(type) { case events.PostprocessingFinished: + sublog := log.With().Str("event", "PostprocessingFinished").Str("uploadid", ev.UploadID).Logger() session, err := fs.sessionStore.Get(ctx, ev.UploadID) if err != nil { - log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("Failed to get upload") + sublog.Error().Err(err).Msg("Failed to get upload") continue // NOTE: since we can't get the upload, we can't delete the blob } @@ -256,11 +257,12 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) { n, err := session.Node(ctx) if err != nil { - log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("could not read node") + sublog.Error().Err(err).Msg("could not read node") continue } + sublog = log.With().Str("spaceid", session.SpaceID()).Str("nodeid", session.NodeID()).Logger() if !n.Exists { - log.Debug().Str("uploadID", ev.UploadID).Str("nodeID", session.NodeID()).Msg("node no longer exists") + sublog.Debug().Msg("node no longer exists") fs.sessionStore.Cleanup(ctx, session, false, false, false) continue } @@ -274,7 +276,7 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) { switch ev.Outcome { default: - log.Error().Str("outcome", string(ev.Outcome)).Str("uploadID", ev.UploadID).Msg("unknown postprocessing outcome - aborting") + sublog.Error().Str("outcome", string(ev.Outcome)).Msg("unknown postprocessing outcome - aborting") fallthrough case events.PPOutcomeAbort: failed = true @@ -283,7 +285,7 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) { metrics.UploadSessionsAborted.Inc() case events.PPOutcomeContinue: if err := session.Finalize(); err != nil { - log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("could not finalize upload") + sublog.Error().Err(err).Msg("could not finalize upload") failed = true revertNodeMetadata = false keepUpload = true @@ -301,7 +303,7 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) { getParent := func() *node.Node { p, err := n.Parent(ctx) if err != nil { - log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("could not read parent") + sublog.Error().Err(err).Msg("could not read parent") return nil } return p @@ -309,15 +311,22 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) { now := time.Now() if failed { - // propagate sizeDiff after failed postprocessing - if err := fs.tp.Propagate(ctx, n, -session.SizeDiff()); err != nil { - log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("could not propagate tree size change") + // if no other upload session is in progress (processing id != session id) or has finished (processing id == "") + latestSession, err := n.ProcessingID(ctx) + if err != nil { + sublog.Error().Err(err).Msg("reading node for session failed") + } + if latestSession == session.ID() { + // propagate reverted sizeDiff after failed postprocessing + if err := fs.tp.Propagate(ctx, n, -session.SizeDiff()); err != nil { + sublog.Error().Err(err).Msg("could not propagate tree size change") + } } } else if p := getParent(); p != nil { // update parent tmtime to propagate etag change after successful postprocessing _ = p.SetTMTime(ctx, &now) if err := fs.tp.Propagate(ctx, p, 0); err != nil { - log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("could not propagate etag change") + sublog.Error().Err(err).Msg("could not propagate etag change") } } @@ -343,22 +352,24 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) { SpaceOwner: n.SpaceOwnerOrManager(ctx), }, ); err != nil { - log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("Failed to publish UploadReady event") + sublog.Error().Err(err).Msg("Failed to publish UploadReady event") } case events.RestartPostprocessing: + sublog := log.With().Str("event", "RestartPostprocessing").Str("uploadid", ev.UploadID).Logger() session, err := fs.sessionStore.Get(ctx, ev.UploadID) if err != nil { - log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("Failed to get upload") + sublog.Error().Err(err).Msg("Failed to get upload") continue } n, err := session.Node(ctx) if err != nil { - log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("could not read node") + sublog.Error().Err(err).Msg("could not read node") continue } + sublog = log.With().Str("spaceid", session.SpaceID()).Str("nodeid", session.NodeID()).Logger() s, err := session.URL(ctx) if err != nil { - log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("could not create url") + sublog.Error().Err(err).Msg("could not create url") continue } @@ -374,9 +385,10 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) { Filename: session.Filename(), Filesize: uint64(session.Size()), }); err != nil { - log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("Failed to publish BytesReceived event") + sublog.Error().Err(err).Msg("Failed to publish BytesReceived event") } case events.PostprocessingStepFinished: + sublog := log.With().Str("event", "PostprocessingStepFinished").Str("uploadid", ev.UploadID).Logger() if ev.FinishedStep != events.PPStepAntivirus { // atm we are only interested in antivirus results continue @@ -388,6 +400,7 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) { // Should we handle this here? continue } + sublog = log.With().Str("scan_description", res.Description).Bool("infected", res.Infected).Logger() var n *node.Node switch ev.UploadID { @@ -473,19 +486,20 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) { // uploadid is not empty -> this is an async upload session, err := fs.sessionStore.Get(ctx, ev.UploadID) if err != nil { - log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("Failed to get upload") + sublog.Error().Err(err).Msg("Failed to get upload") continue } n, err = session.Node(ctx) if err != nil { - log.Error().Err(err).Interface("uploadID", ev.UploadID).Msg("Failed to get node after scan") + sublog.Error().Err(err).Msg("Failed to get node after scan") continue } + sublog = log.With().Str("spaceid", session.SpaceID()).Str("nodeid", session.NodeID()).Logger() } if err := n.SetScanData(ctx, res.Description, res.Scandate); err != nil { - log.Error().Err(err).Str("uploadID", ev.UploadID).Interface("resourceID", res.ResourceID).Msg("Failed to set scan results") + sublog.Error().Err(err).Msg("Failed to set scan results") continue } diff --git a/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/lookup/lookup.go b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/lookup/lookup.go index bd432c5fd3..e7a8139839 100644 --- a/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/lookup/lookup.go +++ b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/lookup/lookup.go @@ -211,7 +211,8 @@ func (lu *Lookup) Path(ctx context.Context, n *node.Node, hasPermission node.Per appctx.GetLogger(ctx). Error().Err(err). Str("path", p). - Interface("node", n). + Str("spaceid", n.SpaceID). + Str("nodeid", n.ID). Msg("Path()") return } diff --git a/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata.go b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata.go index ec5fed27cb..41a44a75f1 100644 --- a/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata.go +++ b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata.go @@ -42,7 +42,7 @@ func (fs *Decomposedfs) SetArbitraryMetadata(ctx context.Context, ref *provider. if err != nil { return errors.Wrap(err, "Decomposedfs: error resolving ref") } - sublog := appctx.GetLogger(ctx).With().Interface("node", n).Logger() + sublog := appctx.GetLogger(ctx).With().Str("spaceid", n.SpaceID).Str("nodeid", n.ID).Logger() if !n.Exists { err = errtypes.NotFound(filepath.Join(n.ParentID, n.Name)) @@ -135,7 +135,7 @@ func (fs *Decomposedfs) UnsetArbitraryMetadata(ctx context.Context, ref *provide if err != nil { return errors.Wrap(err, "Decomposedfs: error resolving ref") } - sublog := appctx.GetLogger(ctx).With().Interface("node", n).Logger() + sublog := appctx.GetLogger(ctx).With().Str("spaceid", n.SpaceID).Str("nodeid", n.ID).Logger() if !n.Exists { err = errtypes.NotFound(filepath.Join(n.ParentID, n.Name)) diff --git a/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node/node.go b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node/node.go index f1db7f5eb8..48432f2241 100644 --- a/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node/node.go +++ b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node/node.go @@ -513,7 +513,7 @@ func (n *Node) readOwner(ctx context.Context) (*userpb.UserId, error) { func (n *Node) PermissionSet(ctx context.Context) (provider.ResourcePermissions, bool) { u, ok := ctxpkg.ContextGetUser(ctx) if !ok { - appctx.GetLogger(ctx).Debug().Interface("node", n).Msg("no user in context, returning default permissions") + appctx.GetLogger(ctx).Debug().Str("spaceid", n.SpaceID).Str("nodeid", n.ID).Msg("no user in context, returning default permissions") return NoPermissions(), false } if utils.UserEqual(u.Id, n.SpaceRoot.Owner()) { @@ -582,7 +582,7 @@ func (n *Node) SetMtime(ctx context.Context, t *time.Time) (err error) { // SetEtag sets the temporary etag of a node if it differs from the current etag func (n *Node) SetEtag(ctx context.Context, val string) (err error) { - sublog := appctx.GetLogger(ctx).With().Interface("node", n).Logger() + sublog := appctx.GetLogger(ctx).With().Str("spaceid", n.SpaceID).Str("nodeid", n.ID).Logger() var tmTime time.Time if tmTime, err = n.GetTMTime(ctx); err != nil { return @@ -635,7 +635,7 @@ func (n *Node) IsDir(ctx context.Context) bool { // AsResourceInfo return the node as CS3 ResourceInfo func (n *Node) AsResourceInfo(ctx context.Context, rp *provider.ResourcePermissions, mdKeys, fieldMask []string, returnBasename bool) (ri *provider.ResourceInfo, err error) { - sublog := appctx.GetLogger(ctx).With().Interface("node", n.ID).Logger() + sublog := appctx.GetLogger(ctx).With().Str("spaceid", n.SpaceID).Str("nodeid", n.ID).Logger() var fn string nodeType := n.Type(ctx) @@ -844,9 +844,9 @@ func (n *Node) readChecksumIntoResourceChecksum(ctx context.Context, algo string Sum: hex.EncodeToString(v), } case metadata.IsAttrUnset(err): - appctx.GetLogger(ctx).Debug().Err(err).Str("nodepath", n.InternalPath()).Str("algorithm", algo).Msg("checksum not set") + appctx.GetLogger(ctx).Debug().Str("spaceid", n.SpaceID).Str("nodeid", n.ID).Str("nodepath", n.InternalPath()).Str("algorithm", algo).Msg("checksum not set") default: - appctx.GetLogger(ctx).Error().Err(err).Str("nodepath", n.InternalPath()).Str("algorithm", algo).Msg("could not read checksum") + appctx.GetLogger(ctx).Error().Err(err).Str("spaceid", n.SpaceID).Str("nodeid", n.ID).Str("nodepath", n.InternalPath()).Str("algorithm", algo).Msg("could not read checksum") } } @@ -864,9 +864,9 @@ func (n *Node) readChecksumIntoOpaque(ctx context.Context, algo string, ri *prov Value: []byte(hex.EncodeToString(v)), } case metadata.IsAttrUnset(err): - appctx.GetLogger(ctx).Debug().Err(err).Str("nodepath", n.InternalPath()).Str("algorithm", algo).Msg("checksum not set") + appctx.GetLogger(ctx).Debug().Str("spaceid", n.SpaceID).Str("nodeid", n.ID).Str("nodepath", n.InternalPath()).Str("algorithm", algo).Msg("checksum not set") default: - appctx.GetLogger(ctx).Error().Err(err).Str("nodepath", n.InternalPath()).Str("algorithm", algo).Msg("could not read checksum") + appctx.GetLogger(ctx).Error().Err(err).Str("spaceid", n.SpaceID).Str("nodeid", n.ID).Str("nodepath", n.InternalPath()).Str("algorithm", algo).Msg("could not read checksum") } } @@ -891,12 +891,12 @@ func (n *Node) readQuotaIntoOpaque(ctx context.Context, ri *provider.ResourceInf Value: []byte(v), } } else { - appctx.GetLogger(ctx).Error().Err(err).Str("nodepath", n.InternalPath()).Str("quota", v).Msg("malformed quota") + appctx.GetLogger(ctx).Error().Err(err).Str("spaceid", n.SpaceID).Str("nodeid", n.ID).Str("nodepath", n.InternalPath()).Str("quota", v).Msg("malformed quota") } case metadata.IsAttrUnset(err): - appctx.GetLogger(ctx).Debug().Err(err).Str("nodepath", n.InternalPath()).Msg("quota not set") + appctx.GetLogger(ctx).Debug().Str("spaceid", n.SpaceID).Str("nodeid", n.ID).Str("nodepath", n.InternalPath()).Msg("quota not set") default: - appctx.GetLogger(ctx).Error().Err(err).Str("nodepath", n.InternalPath()).Msg("could not read quota") + appctx.GetLogger(ctx).Error().Err(err).Str("spaceid", n.SpaceID).Str("nodeid", n.ID).Str("nodepath", n.InternalPath()).Msg("could not read quota") } } @@ -1014,7 +1014,7 @@ func isGrantExpired(g *provider.Grant) bool { func (n *Node) ReadUserPermissions(ctx context.Context, u *userpb.User) (ap provider.ResourcePermissions, accessDenied bool, err error) { // check if the current user is the owner if utils.UserEqual(u.Id, n.Owner()) { - appctx.GetLogger(ctx).Debug().Str("node", n.ID).Msg("user is owner, returning owner permissions") + appctx.GetLogger(ctx).Debug().Str("spaceid", n.SpaceID).Str("nodeid", n.ID).Msg("user is owner, returning owner permissions") return OwnerPermissions(), false, nil } @@ -1032,7 +1032,7 @@ func (n *Node) ReadUserPermissions(ctx context.Context, u *userpb.User) (ap prov // we read all grantees from the node var grantees []string if grantees, err = n.ListGrantees(ctx); err != nil { - appctx.GetLogger(ctx).Error().Err(err).Interface("node", n).Msg("error listing grantees") + appctx.GetLogger(ctx).Error().Err(err).Str("spaceid", n.SpaceID).Str("nodeid", n.ID).Msg("error listing grantees") return NoPermissions(), true, err } @@ -1073,15 +1073,15 @@ func (n *Node) ReadUserPermissions(ctx context.Context, u *userpb.User) (ap prov } AddPermissions(&ap, g.GetPermissions()) case metadata.IsAttrUnset(err): - appctx.GetLogger(ctx).Error().Interface("node", n).Str("grant", grantees[i]).Interface("grantees", grantees).Msg("grant vanished from node after listing") + appctx.GetLogger(ctx).Error().Str("spaceid", n.SpaceID).Str("nodeid", n.ID).Str("grant", grantees[i]).Interface("grantees", grantees).Msg("grant vanished from node after listing") // continue with next segment default: - appctx.GetLogger(ctx).Error().Err(err).Interface("node", n).Str("grant", grantees[i]).Msg("error reading permissions") + appctx.GetLogger(ctx).Error().Err(err).Str("spaceid", n.SpaceID).Str("nodeid", n.ID).Str("grant", grantees[i]).Msg("error reading permissions") // continue with next segment } } - appctx.GetLogger(ctx).Debug().Interface("permissions", ap).Interface("node", n).Interface("user", u).Msg("returning aggregated permissions") + appctx.GetLogger(ctx).Debug().Interface("permissions", ap).Str("spaceid", n.SpaceID).Str("nodeid", n.ID).Interface("user", u).Msg("returning aggregated permissions") return ap, false, nil } @@ -1135,7 +1135,7 @@ func (n *Node) IsDenied(ctx context.Context) bool { func (n *Node) ListGrantees(ctx context.Context) (grantees []string, err error) { attrs, err := n.Xattrs(ctx) if err != nil { - appctx.GetLogger(ctx).Error().Err(err).Str("node", n.ID).Msg("error listing attributes") + appctx.GetLogger(ctx).Error().Err(err).Str("spaceid", n.SpaceID).Str("nodeid", n.ID).Msg("error listing attributes") return nil, err } for name := range attrs { @@ -1202,7 +1202,8 @@ func (n *Node) ListGrants(ctx context.Context) ([]*provider.Grant, error) { appctx.GetLogger(ctx). Error(). Err(err). - Str("node", n.ID). + Str("spaceid", n.SpaceID). + Str("nodeid", n.ID). Str("grantee", g). Msg("error reading grant") continue @@ -1276,6 +1277,12 @@ func (n *Node) IsProcessing(ctx context.Context) bool { return err == nil && strings.HasPrefix(v, ProcessingStatus) } +// ProcessingID returns the latest upload session id +func (n *Node) ProcessingID(ctx context.Context) (string, error) { + v, err := n.XattrString(ctx, prefixes.StatusPrefix) + return strings.TrimPrefix(v, ProcessingStatus), err +} + // IsSpaceRoot checks if the node is a space root func (n *Node) IsSpaceRoot(ctx context.Context) bool { _, err := n.Xattr(ctx, prefixes.SpaceNameAttr) diff --git a/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node/permissions.go b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node/permissions.go index 4e3459a946..c8367ef859 100644 --- a/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node/permissions.go +++ b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node/permissions.go @@ -168,7 +168,7 @@ func (p *Permissions) assemblePermissions(ctx context.Context, n *Node, failOnTr } AddPermissions(&ap, &np) } else { - appctx.GetLogger(ctx).Error().Err(err).Interface("node", cn.ID).Msg("error reading permissions") + appctx.GetLogger(ctx).Error().Err(err).Str("spaceid", cn.SpaceID).Str("nodeid", cn.ID).Msg("error reading permissions") // continue with next segment } @@ -194,7 +194,7 @@ func (p *Permissions) assemblePermissions(ctx context.Context, n *Node, failOnTr } AddPermissions(&ap, &np) } else { - appctx.GetLogger(ctx).Error().Err(err).Interface("node", cn.ID).Msg("error reading root node permissions") + appctx.GetLogger(ctx).Error().Err(err).Str("spaceid", cn.SpaceID).Str("nodeid", cn.ID).Msg("error reading root node permissions") } // check if the current user is the owner @@ -202,7 +202,7 @@ func (p *Permissions) assemblePermissions(ctx context.Context, n *Node, failOnTr return OwnerPermissions(), nil } - appctx.GetLogger(ctx).Debug().Interface("permissions", ap).Interface("node", n.ID).Interface("user", u).Msg("returning agregated permissions") + appctx.GetLogger(ctx).Debug().Interface("permissions", ap).Str("spaceid", n.SpaceID).Str("nodeid", n.ID).Interface("user", u).Msg("returning agregated permissions") return ap, nil } diff --git a/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/recycle.go b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/recycle.go index 72c2b22640..aa0ead067b 100644 --- a/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/recycle.go +++ b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/recycle.go @@ -57,7 +57,7 @@ func (fs *Decomposedfs) ListRecycle(ctx context.Context, ref *provider.Reference } spaceID := ref.ResourceId.OpaqueId - sublog := appctx.GetLogger(ctx).With().Str("space", spaceID).Str("key", key).Str("relative_path", relativePath).Logger() + sublog := appctx.GetLogger(ctx).With().Str("spaceid", spaceID).Str("key", key).Str("relative_path", relativePath).Logger() // check permissions trashnode, err := fs.lu.NodeFromSpaceID(ctx, spaceID) @@ -98,7 +98,7 @@ func (fs *Decomposedfs) ListRecycle(ctx context.Context, ref *provider.Reference if attrBytes, ok := attrs[prefixes.TrashOriginAttr]; ok { origin = string(attrBytes) } else { - sublog.Error().Err(err).Str("space", spaceID).Msg("could not read origin path, skipping") + sublog.Error().Err(err).Str("spaceid", spaceID).Msg("could not read origin path, skipping") return nil, err } @@ -291,14 +291,14 @@ func (fs *Decomposedfs) listTrashRoot(ctx context.Context, spaceID string) ([]*p // TODO nanos } } else { - log.Error().Err(err).Str("trashRoot", trashRoot).Str("item", itemPath).Str("node", nodeID).Str("dtime", timeSuffix).Msg("could not parse time format, ignoring") + log.Error().Err(err).Str("trashRoot", trashRoot).Str("item", itemPath).Str("spaceid", spaceID).Str("nodeid", nodeID).Str("dtime", timeSuffix).Msg("could not parse time format, ignoring") } // lookup origin path in extended attributes if attr, ok := attrs[prefixes.TrashOriginAttr]; ok { item.Ref = &provider.Reference{Path: string(attr)} } else { - log.Error().Str("trashRoot", trashRoot).Str("item", itemPath).Str("node", nodeID).Str("dtime", timeSuffix).Msg("could not read origin path") + log.Error().Str("trashRoot", trashRoot).Str("item", itemPath).Str("spaceid", spaceID).Str("nodeid", nodeID).Str("dtime", timeSuffix).Msg("could not read origin path") } select { case results <- item: diff --git a/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/spaces.go b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/spaces.go index 0c1f541f8b..6a555ea12f 100644 --- a/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/spaces.go +++ b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/spaces.go @@ -808,7 +808,7 @@ func (fs *Decomposedfs) storageSpaceFromNode(ctx context.Context, n *node.Node, } } - sublog := appctx.GetLogger(ctx).With().Str("space", n.SpaceRoot.ID).Logger() + sublog := appctx.GetLogger(ctx).With().Str("spaceid", n.SpaceID).Logger() var err error // TODO apply more filters diff --git a/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/upload.go b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/upload.go index 02f74ce262..03c757b988 100644 --- a/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/upload.go +++ b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/upload.go @@ -223,7 +223,7 @@ func (fs *Decomposedfs) InitiateUpload(ctx context.Context, ref *provider.Refere } } - log.Debug().Interface("session", session).Interface("node", n).Interface("metadata", metadata).Msg("Decomposedfs: resolved filename") + log.Debug().Str("uploadid", session.ID()).Str("spaceid", n.SpaceID).Str("nodeid", n.ID).Interface("metadata", metadata).Msg("Decomposedfs: resolved filename") _, err = node.CheckQuota(ctx, n.SpaceRoot, n.Exists, uint64(n.Blobsize), uint64(session.Size())) if err != nil { diff --git a/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/upload/store.go b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/upload/store.go index 57a7aea8f4..b880b39baa 100644 --- a/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/upload/store.go +++ b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/upload/store.go @@ -205,15 +205,23 @@ func (store OcisStore) CreateNodeForUpload(session *OcisSession, initAttrs node. } var f *lockedfile.File - if session.NodeExists() { + if session.NodeExists() { // TODO this is wrong. The node should be created when the upload starts, the revisions should be created independently of the node + // we do not need to propagate a change when a node is created, only when the upload is ready. + // that still creates problems for desktop clients because if another change causes propagation it will detects an empty file + // so the first upload has to point to the first revision with the expected size. The file cannot be downloaded, but it can be overwritten (which will create a new revision and make the node reflect the latest revision) + // any finished postprocessing will not affect the node metadata. + // *thinking* but then initializing an upload will lock the file until the upload has finished. That sucks. + // so we have to check if the node has been created meanwhile (well, only in case the upload does not know the nodeid ... or the NodeExists array that is checked by session.NodeExists()) + // FIXME look at the disk again to see if the file has been created in between, or just try initializing a new node and do the update existing node as a fallback. <- the latter! + f, err = store.updateExistingNode(ctx, session, n, session.SpaceID(), uint64(session.Size())) if f != nil { - appctx.GetLogger(ctx).Info().Str("lockfile", f.Name()).Interface("err", err).Msg("got lock file from updateExistingNode") + appctx.GetLogger(ctx).Debug().Str("lockfile", f.Name()).Interface("err", err).Msg("got lock file from updateExistingNode") } } else { f, err = store.initNewNode(ctx, session, n, uint64(session.Size())) if f != nil { - appctx.GetLogger(ctx).Info().Str("lockfile", f.Name()).Interface("err", err).Msg("got lock file from initNewNode") + appctx.GetLogger(ctx).Debug().Str("lockfile", f.Name()).Interface("err", err).Msg("got lock file from initNewNode") } } defer func() { diff --git a/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/upload/upload.go b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/upload/upload.go index 9244052a10..fe2861e556 100644 --- a/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/upload/upload.go +++ b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/upload/upload.go @@ -278,14 +278,12 @@ func (session *OcisSession) ConcatUploads(_ context.Context, uploads []tusd.Uplo func (session *OcisSession) Finalize() (err error) { ctx, span := tracer.Start(session.Context(context.Background()), "Finalize") defer span.End() - n, err := session.Node(ctx) - if err != nil { - return err - } + + revisionNode := &node.Node{SpaceID: session.SpaceID(), BlobID: session.ID(), Blobsize: session.Size()} // upload the data to the blobstore _, subspan := tracer.Start(ctx, "WriteBlob") - err = session.store.tp.WriteBlob(n, session.binPath()) + err = session.store.tp.WriteBlob(revisionNode, session.binPath()) subspan.End() if err != nil { return errors.Wrap(err, "failed to upload file to blobstore") @@ -318,12 +316,12 @@ func (session *OcisSession) Cleanup(revertNodeMetadata, cleanBin, cleanInfo bool ctx := session.Context(context.Background()) if revertNodeMetadata { + n, err := session.Node(ctx) + if err != nil { + appctx.GetLogger(ctx).Error().Err(err).Str("spaceid", n.SpaceID).Str("nodeid", n.ID).Str("uploadid", session.ID()).Msg("reading node for session failed") + } if session.NodeExists() { p := session.info.MetaData["versionsPath"] - n, err := session.Node(ctx) - if err != nil { - appctx.GetLogger(ctx).Error().Err(err).Str("sessionid", session.ID()).Msg("reading node for session failed") - } if err := session.store.lu.CopyMetadata(ctx, p, n.InternalPath(), func(attributeName string, value []byte) (newValue []byte, copy bool) { return value, strings.HasPrefix(attributeName, prefixes.ChecksumPrefix) || attributeName == prefixes.TypeAttr || @@ -339,7 +337,16 @@ func (session *OcisSession) Cleanup(revertNodeMetadata, cleanBin, cleanInfo bool } } else { - session.removeNode(ctx) + // if no other upload session is in progress (processing id != session id) or has finished (processing id == "") + latestSession, err := n.ProcessingID(ctx) + if err != nil { + appctx.GetLogger(ctx).Error().Err(err).Str("spaceid", n.SpaceID).Str("nodeid", n.ID).Str("uploadid", session.ID()).Msg("reading processingid for session failed") + } + if latestSession == session.ID() { + // actually delete the node + session.removeNode(ctx) + } + // FIXME else if the upload has become a revision, delete the revision, or if it is the last one, delete the node } } diff --git a/vendor/modules.txt b/vendor/modules.txt index d896e9457a..7046c04814 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -366,7 +366,7 @@ github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1 github.com/cs3org/go-cs3apis/cs3/storage/registry/v1beta1 github.com/cs3org/go-cs3apis/cs3/tx/v1beta1 github.com/cs3org/go-cs3apis/cs3/types/v1beta1 -# github.com/cs3org/reva/v2 v2.19.2-0.20240405190914-ef59ba20ef0e +# github.com/cs3org/reva/v2 v2.19.2-0.20240415154646-5c2c9831a2b7 ## explicit; go 1.21 github.com/cs3org/reva/v2/cmd/revad/internal/grace github.com/cs3org/reva/v2/cmd/revad/runtime From 368ca883db6336566b98ed35baebea0f6ef23170 Mon Sep 17 00:00:00 2001 From: jkoberg Date: Mon, 15 Apr 2024 17:49:20 +0200 Subject: [PATCH 5/5] feat(clientlog): space-share-updated sse Signed-off-by: jkoberg --- services/clientlog/pkg/command/server.go | 1 + services/clientlog/pkg/service/service.go | 3 +++ 2 files changed, 4 insertions(+) diff --git a/services/clientlog/pkg/command/server.go b/services/clientlog/pkg/command/server.go index 044a66e8b2..7c41e7a36e 100644 --- a/services/clientlog/pkg/command/server.go +++ b/services/clientlog/pkg/command/server.go @@ -34,6 +34,7 @@ var _registeredEvents = []events.Unmarshaller{ events.FileUnlocked{}, events.FileTouched{}, events.SpaceShared{}, + events.SpaceShareUpdated{}, events.SpaceUnshared{}, events.ShareCreated{}, events.ShareRemoved{}, diff --git a/services/clientlog/pkg/service/service.go b/services/clientlog/pkg/service/service.go index a194d6af0d..ab7bf7cc4a 100644 --- a/services/clientlog/pkg/service/service.go +++ b/services/clientlog/pkg/service/service.go @@ -160,6 +160,9 @@ func (cl *ClientlogService) processEvent(event events.Event) { case events.SpaceShared: r, _ := storagespace.ParseReference(e.ID.GetOpaqueId()) p("space-member-added", &r) + case events.SpaceShareUpdated: + r, _ := storagespace.ParseReference(e.ID.GetOpaqueId()) + p("space-share-updated", &r) case events.SpaceUnshared: r, _ := storagespace.ParseReference(e.ID.GetOpaqueId()) p("space-member-removed", &r)