Bump reva to pull in the latest changes

This commit is contained in:
André Duffeck
2024-01-16 08:06:23 +01:00
parent 2253096609
commit b2db0c7902
16 changed files with 238 additions and 177 deletions

2
go.mod
View File

@@ -13,7 +13,7 @@ require (
github.com/coreos/go-oidc v2.2.1+incompatible
github.com/coreos/go-oidc/v3 v3.9.0
github.com/cs3org/go-cs3apis v0.0.0-20231023073225-7748710e0781
github.com/cs3org/reva/v2 v2.18.1-0.20240104084554-e85441869c2b
github.com/cs3org/reva/v2 v2.18.1-0.20240115094008-bde86a38bd77
github.com/dhowden/tag v0.0.0-20230630033851-978a0926ee25
github.com/disintegration/imaging v1.6.2
github.com/dutchcoders/go-clamd v0.0.0-20170520113014-b970184f4d9e

4
go.sum
View File

@@ -1018,8 +1018,8 @@ github.com/crewjam/saml v0.4.14 h1:g9FBNx62osKusnFzs3QTN5L9CVA/Egfgm+stJShzw/c=
github.com/crewjam/saml v0.4.14/go.mod h1:UVSZCf18jJkk6GpWNVqcyQJMD5HsRugBPf4I1nl2mME=
github.com/cs3org/go-cs3apis v0.0.0-20231023073225-7748710e0781 h1:BUdwkIlf8IS2FasrrPg8gGPHQPOrQ18MS1Oew2tmGtY=
github.com/cs3org/go-cs3apis v0.0.0-20231023073225-7748710e0781/go.mod h1:UXha4TguuB52H14EMoSsCqDj7k8a/t7g4gVP+bgY5LY=
github.com/cs3org/reva/v2 v2.18.1-0.20240104084554-e85441869c2b h1:Nh0SZn2MyCWC/gmV6le7e9eVzux9WWGPQ/nECgh/gyg=
github.com/cs3org/reva/v2 v2.18.1-0.20240104084554-e85441869c2b/go.mod h1:QW31Q1IQ9ZCJMFv3u8/SdHSyLfCcSVNcRbqIJj+Y+7o=
github.com/cs3org/reva/v2 v2.18.1-0.20240115094008-bde86a38bd77 h1:wDi6MOBGdd9zyqDSwhm2FBYqkiEVgDUevpMBKQ6zSf4=
github.com/cs3org/reva/v2 v2.18.1-0.20240115094008-bde86a38bd77/go.mod h1:plMbmaHczZbP+1rtV56YCYs5lkmpdRNpj0KZb9BWLus=
github.com/cyberdelia/templates v0.0.0-20141128023046-ca7fffd4298c/go.mod h1:GyV+0YP4qX0UQ7r2MoYZ+AvYDp12OF5yg4q8rGnyNh4=
github.com/cyphar/filepath-securejoin v0.2.4 h1:Ugdm7cg7i6ZK6x3xDF1oEu1nfkyfH53EtKeQYTC3kyg=
github.com/cyphar/filepath-securejoin v0.2.4/go.mod h1:aPGpWjXOXUn2NCNjFvBE6aRxGGx79pTxQpKOJNYHHl4=

View File

@@ -25,6 +25,7 @@ import (
"strings"
"github.com/cs3org/reva/v2/pkg/storagespace"
"google.golang.org/genproto/protobuf/field_mask"
"google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
gstatus "google.golang.org/grpc/status"
@@ -397,7 +398,7 @@ func (s *service) ListStorageSpaces(ctx context.Context, req *provider.ListStora
var shareInfo map[string]*provider.ResourceInfo
var err error
if fetchShares {
receivedShares, shareInfo, err = s.fetchShares(ctx)
receivedShares, shareInfo, err = s.fetchShares(ctx, req.Opaque, []string{}, &fieldmaskpb.FieldMask{ /*TODO mtime and etag only?*/ })
if err != nil {
return nil, errors.Wrap(err, "sharesstorageprovider: error calling ListReceivedSharesRequest")
}
@@ -708,7 +709,7 @@ func (s *service) Stat(ctx context.Context, req *provider.StatRequest) (*provide
if !ok {
return nil, fmt.Errorf("missing user in context")
}
receivedShares, shareMd, err := s.fetchShares(ctx)
receivedShares, shareMd, err := s.fetchShares(ctx, req.Opaque, req.ArbitraryMetadataKeys, req.FieldMask)
if err != nil {
return nil, err
}
@@ -804,7 +805,7 @@ func (s *service) ListContainer(ctx context.Context, req *provider.ListContainer
// The root is empty, it is filled by mountpoints
// so, when accessing the root via /dav/spaces, we need to list the accepted shares with their mountpoint
receivedShares, _, err := s.fetchShares(ctx)
receivedShares, shareMd, err := s.fetchShares(ctx, req.Opaque, req.ArbitraryMetadataKeys, req.FieldMask)
if err != nil {
return nil, errors.Wrap(err, "sharesstorageprovider: error calling ListReceivedSharesRequest")
}
@@ -820,31 +821,38 @@ func (s *service) ListContainer(ctx context.Context, req *provider.ListContainer
continue
}
statRes, err := gatewayClient.Stat(ctx, &provider.StatRequest{
Opaque: req.Opaque,
Ref: &provider.Reference{
ResourceId: share.Share.ResourceId,
Path: ".",
},
ArbitraryMetadataKeys: req.ArbitraryMetadataKeys,
})
switch {
case err != nil:
appctx.GetLogger(ctx).Error().
Err(err).
Interface("share", share).
Msg("sharesstorageprovider: could not make stat request when listing virtual root, skipping")
continue
case statRes.Status.Code != rpc.Code_CODE_OK:
appctx.GetLogger(ctx).Debug().
Interface("share", share).
Interface("status", statRes.Status).
Msg("sharesstorageprovider: could not stat share when listing virtual root, skipping")
continue
info := shareMd[share.GetShare().GetId().GetOpaqueId()]
if info == nil {
if share.GetShare().GetResourceId().GetSpaceId() == "" {
// convert backwards compatible share id
share.Share.ResourceId.StorageId, share.Share.ResourceId.SpaceId = storagespace.SplitStorageID(share.GetShare().GetResourceId().GetSpaceId())
}
statRes, err := gatewayClient.Stat(ctx, &provider.StatRequest{
Opaque: req.Opaque,
Ref: &provider.Reference{
ResourceId: share.Share.ResourceId,
Path: ".",
},
ArbitraryMetadataKeys: req.ArbitraryMetadataKeys,
})
switch {
case err != nil:
appctx.GetLogger(ctx).Error().
Err(err).
Interface("share", share).
Msg("sharesstorageprovider: could not make stat request when listing virtual root, skipping")
continue
case statRes.Status.Code != rpc.Code_CODE_OK:
appctx.GetLogger(ctx).Debug().
Interface("share", share).
Interface("status", statRes.Status).
Msg("sharesstorageprovider: could not stat share when listing virtual root, skipping")
continue
}
info = statRes.Info
}
// override info
info := statRes.Info
// override resource id info
info.Id = &provider.ResourceId{
StorageId: utils.ShareStorageProviderID,
SpaceId: utils.ShareStorageSpaceID,
@@ -1067,7 +1075,13 @@ func (s *service) resolveAcceptedShare(ctx context.Context, ref *provider.Refere
// look up share for this resourceid
lsRes, err := sharingCollaborationClient.ListReceivedShares(ctx, &collaboration.ListReceivedSharesRequest{
Filters: []*collaboration.Filter{
// FIXME filter by accepted ... and by mountpoint?
{
Type: collaboration.Filter_TYPE_STATE,
Term: &collaboration.Filter_State{
State: collaboration.ShareState_SHARE_STATE_ACCEPTED,
},
},
// TODO filter by mountpoint?
},
})
if err != nil {
@@ -1077,6 +1091,7 @@ func (s *service) resolveAcceptedShare(ctx context.Context, ref *provider.Refere
return nil, lsRes.Status, nil
}
for _, receivedShare := range lsRes.Shares {
// make sure to skip unaccepted shares
if receivedShare.State != collaboration.ShareState_SHARE_STATE_ACCEPTED {
continue
}
@@ -1121,7 +1136,7 @@ func (s *service) rejectReceivedShare(ctx context.Context, receivedShare *collab
return errtypes.NewErrtypeFromStatus(res.Status)
}
func (s *service) fetchShares(ctx context.Context) ([]*collaboration.ReceivedShare, map[string]*provider.ResourceInfo, error) {
func (s *service) fetchShares(ctx context.Context, opaque *typesv1beta1.Opaque, arbitraryMetadataKeys []string, fieldMask *field_mask.FieldMask) ([]*collaboration.ReceivedShare, map[string]*provider.ResourceInfo, error) {
sharingCollaborationClient, err := s.sharingCollaborationSelector.Next()
if err != nil {
return nil, nil, err
@@ -1152,7 +1167,12 @@ func (s *service) fetchShares(ctx context.Context) ([]*collaboration.ReceivedSha
// convert backwards compatible share id
rs.Share.ResourceId.StorageId, rs.Share.ResourceId.SpaceId = storagespace.SplitStorageID(rs.Share.ResourceId.StorageId)
}
sRes, err := gatewayClient.Stat(ctx, &provider.StatRequest{Ref: &provider.Reference{ResourceId: rs.Share.ResourceId}})
sRes, err := gatewayClient.Stat(ctx, &provider.StatRequest{
Opaque: opaque,
Ref: &provider.Reference{ResourceId: rs.Share.ResourceId},
ArbitraryMetadataKeys: arbitraryMetadataKeys,
FieldMask: fieldMask,
})
if err != nil {
appctx.GetLogger(ctx).Error().
Err(err).

View File

@@ -702,13 +702,27 @@ func (s *service) Delete(ctx context.Context, req *provider.DeleteRequest) (*pro
}
}
md, err := s.storage.GetMD(ctx, req.Ref, []string{}, []string{"id"})
md, err := s.storage.GetMD(ctx, req.Ref, []string{}, []string{"id", "status"})
if err != nil {
return &provider.DeleteResponse{
Status: status.NewStatusFromErrType(ctx, "can't stat resource to delete", err),
}, nil
}
if utils.ReadPlainFromOpaque(md.GetOpaque(), "status") == "processing" {
return &provider.DeleteResponse{
Status: &rpc.Status{
Code: rpc.Code_CODE_UNAVAILABLE,
Message: "file is processing",
},
Opaque: &typesv1beta1.Opaque{
Map: map[string]*typesv1beta1.OpaqueEntry{
"status": {Decoder: "plain", Value: []byte("processing")},
},
},
}, nil
}
err = s.storage.Delete(ctx, req.Ref)
return &provider.DeleteResponse{

View File

@@ -292,58 +292,58 @@ func (s *svc) handlePut(ctx context.Context, w http.ResponseWriter, r *http.Requ
}
// ony send actual PUT request if file has bytes. Otherwise the initiate file upload request creates the file
// if length != 0 { // FIXME bring back 0 byte file upload handling, see https://github.com/owncloud/ocis/issues/2609
var ep, token string
for _, p := range uRes.Protocols {
if p.Protocol == "simple" {
ep, token = p.UploadEndpoint, p.Token
if length != 0 {
var ep, token string
for _, p := range uRes.Protocols {
if p.Protocol == "simple" {
ep, token = p.UploadEndpoint, p.Token
}
}
}
httpReq, err := rhttp.NewRequest(ctx, http.MethodPut, ep, r.Body)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
Propagator.Inject(ctx, propagation.HeaderCarrier(httpReq.Header))
httpReq.Header.Set(datagateway.TokenTransportHeader, token)
httpRes, err := s.client.Do(httpReq)
if err != nil {
log.Error().Err(err).Msg("error doing PUT request to data service")
w.WriteHeader(http.StatusInternalServerError)
return
}
defer httpRes.Body.Close()
if httpRes.StatusCode != http.StatusOK {
if httpRes.StatusCode == http.StatusPartialContent {
w.WriteHeader(http.StatusPartialContent)
httpReq, err := rhttp.NewRequest(ctx, http.MethodPut, ep, r.Body)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
if httpRes.StatusCode == errtypes.StatusChecksumMismatch {
w.WriteHeader(http.StatusBadRequest)
b, err := errors.Marshal(http.StatusBadRequest, "The computed checksum does not match the one received from the client.", "")
errors.HandleWebdavError(&log, w, b, err)
Propagator.Inject(ctx, propagation.HeaderCarrier(httpReq.Header))
httpReq.Header.Set(datagateway.TokenTransportHeader, token)
httpRes, err := s.client.Do(httpReq)
if err != nil {
log.Error().Err(err).Msg("error doing PUT request to data service")
w.WriteHeader(http.StatusInternalServerError)
return
}
defer httpRes.Body.Close()
if httpRes.StatusCode != http.StatusOK {
if httpRes.StatusCode == http.StatusPartialContent {
w.WriteHeader(http.StatusPartialContent)
return
}
if httpRes.StatusCode == errtypes.StatusChecksumMismatch {
w.WriteHeader(http.StatusBadRequest)
b, err := errors.Marshal(http.StatusBadRequest, "The computed checksum does not match the one received from the client.", "")
errors.HandleWebdavError(&log, w, b, err)
return
}
log.Error().Err(err).Msg("PUT request to data server failed")
w.WriteHeader(httpRes.StatusCode)
return
}
log.Error().Err(err).Msg("PUT request to data server failed")
w.WriteHeader(httpRes.StatusCode)
return
}
// copy headers if they are present
if httpRes.Header.Get(net.HeaderETag) != "" {
w.Header().Set(net.HeaderETag, httpRes.Header.Get(net.HeaderETag))
}
if httpRes.Header.Get(net.HeaderOCETag) != "" {
w.Header().Set(net.HeaderOCETag, httpRes.Header.Get(net.HeaderOCETag))
}
if httpRes.Header.Get(net.HeaderOCFileID) != "" {
w.Header().Set(net.HeaderOCFileID, httpRes.Header.Get(net.HeaderOCFileID))
}
if httpRes.Header.Get(net.HeaderLastModified) != "" {
w.Header().Set(net.HeaderLastModified, httpRes.Header.Get(net.HeaderLastModified))
// copy headers if they are present
if httpRes.Header.Get(net.HeaderETag) != "" {
w.Header().Set(net.HeaderETag, httpRes.Header.Get(net.HeaderETag))
}
if httpRes.Header.Get(net.HeaderOCETag) != "" {
w.Header().Set(net.HeaderOCETag, httpRes.Header.Get(net.HeaderOCETag))
}
if httpRes.Header.Get(net.HeaderOCFileID) != "" {
w.Header().Set(net.HeaderOCFileID, httpRes.Header.Get(net.HeaderOCFileID))
}
if httpRes.Header.Get(net.HeaderLastModified) != "" {
w.Header().Set(net.HeaderLastModified, httpRes.Header.Get(net.HeaderLastModified))
}
}
// file was new

View File

@@ -252,56 +252,58 @@ func (s *svc) handleTusPost(ctx context.Context, w http.ResponseWriter, r *http.
// for creation-with-upload extension forward bytes to dataprovider
// TODO check this really streams
if r.Header.Get(net.HeaderContentType) == "application/offset+octet-stream" {
length, err := strconv.ParseInt(r.Header.Get(net.HeaderContentLength), 10, 64)
if err != nil {
log.Debug().Err(err).Msg("wrong request")
w.WriteHeader(http.StatusBadRequest)
return
}
var httpRes *http.Response
finishUpload := true
if uploadLength > 0 {
var httpRes *http.Response
httpReq, err := rhttp.NewRequest(ctx, http.MethodPatch, ep, r.Body)
if err != nil {
log.Debug().Err(err).Msg("wrong request")
w.WriteHeader(http.StatusInternalServerError)
return
}
Propagator.Inject(ctx, propagation.HeaderCarrier(httpReq.Header))
httpReq, err := rhttp.NewRequest(ctx, http.MethodPatch, ep, r.Body)
if err != nil {
log.Debug().Err(err).Msg("wrong request")
w.WriteHeader(http.StatusInternalServerError)
return
}
Propagator.Inject(ctx, propagation.HeaderCarrier(httpReq.Header))
httpReq.Header.Set(net.HeaderContentType, r.Header.Get(net.HeaderContentType))
httpReq.Header.Set(net.HeaderContentLength, r.Header.Get(net.HeaderContentLength))
if r.Header.Get(net.HeaderUploadOffset) != "" {
httpReq.Header.Set(net.HeaderUploadOffset, r.Header.Get(net.HeaderUploadOffset))
} else {
httpReq.Header.Set(net.HeaderUploadOffset, "0")
}
httpReq.Header.Set(net.HeaderTusResumable, r.Header.Get(net.HeaderTusResumable))
httpReq.Header.Set(net.HeaderContentType, r.Header.Get(net.HeaderContentType))
httpReq.Header.Set(net.HeaderContentLength, r.Header.Get(net.HeaderContentLength))
if r.Header.Get(net.HeaderUploadOffset) != "" {
httpReq.Header.Set(net.HeaderUploadOffset, r.Header.Get(net.HeaderUploadOffset))
} else {
httpReq.Header.Set(net.HeaderUploadOffset, "0")
}
httpReq.Header.Set(net.HeaderTusResumable, r.Header.Get(net.HeaderTusResumable))
httpRes, err = s.client.Do(httpReq)
if err != nil {
log.Error().Err(err).Msg("error doing PATCH request to data gateway")
w.WriteHeader(http.StatusInternalServerError)
return
}
defer httpRes.Body.Close()
httpRes, err = s.client.Do(httpReq)
if err != nil || httpRes == nil {
log.Error().Err(err).Msg("error doing PATCH request to data gateway")
w.WriteHeader(http.StatusInternalServerError)
return
}
defer httpRes.Body.Close()
w.Header().Set(net.HeaderUploadOffset, httpRes.Header.Get(net.HeaderUploadOffset))
w.Header().Set(net.HeaderTusResumable, httpRes.Header.Get(net.HeaderTusResumable))
w.Header().Set(net.HeaderTusUploadExpires, httpRes.Header.Get(net.HeaderTusUploadExpires))
if httpRes.StatusCode != http.StatusNoContent {
w.WriteHeader(httpRes.StatusCode)
return
}
if httpRes.StatusCode != http.StatusNoContent {
w.WriteHeader(httpRes.StatusCode)
return
}
// check if upload was fully completed
if length == 0 || httpRes.Header.Get(net.HeaderUploadOffset) == r.Header.Get(net.HeaderUploadLength) {
// get uploaded file metadata
w.Header().Set(net.HeaderUploadOffset, httpRes.Header.Get(net.HeaderUploadOffset))
w.Header().Set(net.HeaderTusResumable, httpRes.Header.Get(net.HeaderTusResumable))
w.Header().Set(net.HeaderTusUploadExpires, httpRes.Header.Get(net.HeaderTusUploadExpires))
if httpRes.Header.Get(net.HeaderOCMtime) != "" {
w.Header().Set(net.HeaderOCMtime, httpRes.Header.Get(net.HeaderOCMtime))
}
if resid, err := storagespace.ParseID(httpRes.Header.Get(net.HeaderOCFileID)); err == nil {
sReq.Ref = &provider.Reference{
ResourceId: &resid,
}
}
finishUpload = httpRes.Header.Get(net.HeaderUploadOffset) == r.Header.Get(net.HeaderUploadLength)
}
// check if upload was fully completed
if uploadLength == 0 || finishUpload {
// get uploaded file metadata
sRes, err := client.Stat(ctx, sReq)
if err != nil {
@@ -311,7 +313,6 @@ func (s *svc) handleTusPost(ctx context.Context, w http.ResponseWriter, r *http.
}
if sRes.Status.Code != rpc.Code_CODE_OK && sRes.Status.Code != rpc.Code_CODE_NOT_FOUND {
if sRes.Status.Code == rpc.Code_CODE_PERMISSION_DENIED {
// the token expired during upload, so the stat failed
// and we can't do anything about it.
@@ -330,10 +331,6 @@ func (s *svc) handleTusPost(ctx context.Context, w http.ResponseWriter, r *http.
w.WriteHeader(http.StatusInternalServerError)
return
}
if httpRes != nil && httpRes.Header != nil && httpRes.Header.Get(net.HeaderOCMtime) != "" {
// set the "accepted" value if returned in the upload response headers
w.Header().Set(net.HeaderOCMtime, httpRes.Header.Get(net.HeaderOCMtime))
}
// get WebDav permissions for file
isPublic := false

View File

@@ -31,6 +31,7 @@ import (
"github.com/cs3org/reva/v2/pkg/appctx"
"github.com/cs3org/reva/v2/pkg/errtypes"
"github.com/cs3org/reva/v2/pkg/share/manager/jsoncs3/shareid"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/mtimesyncedcache"
"github.com/cs3org/reva/v2/pkg/storage/utils/metadata"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
@@ -45,7 +46,7 @@ const tracerName = "sharecache"
type Cache struct {
lockMap sync.Map
UserShares map[string]*UserShareCache
UserShares mtimesyncedcache.Map[string, *UserShareCache]
storage metadata.Storage
namespace string
@@ -76,7 +77,7 @@ func (c *Cache) lockUser(userID string) func() {
// New returns a new Cache instance
func New(s metadata.Storage, namespace, filename string, ttl time.Duration) Cache {
return Cache{
UserShares: map[string]*UserShareCache{},
UserShares: mtimesyncedcache.Map[string, *UserShareCache]{},
storage: s,
namespace: namespace,
filename: filename,
@@ -93,7 +94,7 @@ func (c *Cache) Add(ctx context.Context, userid, shareID string) error {
span.SetAttributes(attribute.String("cs3.userid", userid))
defer unlock()
if c.UserShares[userid] == nil {
if _, ok := c.UserShares.Load(userid); !ok {
err := c.syncWithLock(ctx, userid)
if err != nil {
return err
@@ -111,7 +112,8 @@ func (c *Cache) Add(ctx context.Context, userid, shareID string) error {
c.initializeIfNeeded(userid, ssid)
// add share id
c.UserShares[userid].UserShares[ssid].IDs[shareID] = struct{}{}
us, _ := c.UserShares.Load(userid)
us.UserShares[ssid].IDs[shareID] = struct{}{}
return c.Persist(ctx, userid)
}
@@ -158,7 +160,7 @@ func (c *Cache) Remove(ctx context.Context, userid, shareID string) error {
span.SetAttributes(attribute.String("cs3.userid", userid))
defer unlock()
if c.UserShares[userid] == nil {
if _, ok := c.UserShares.Load(userid); ok {
err := c.syncWithLock(ctx, userid)
if err != nil {
return err
@@ -173,15 +175,13 @@ func (c *Cache) Remove(ctx context.Context, userid, shareID string) error {
ssid := storageid + shareid.IDDelimiter + spaceid
persistFunc := func() error {
if c.UserShares[userid] == nil {
c.UserShares[userid] = &UserShareCache{
UserShares: map[string]*SpaceShareIDs{},
}
}
us, loaded := c.UserShares.LoadOrStore(userid, &UserShareCache{
UserShares: map[string]*SpaceShareIDs{},
})
if c.UserShares[userid].UserShares[ssid] != nil {
if loaded {
// remove share id
delete(c.UserShares[userid].UserShares[ssid].IDs, shareID)
delete(us.UserShares[ssid].IDs, shareID)
}
return c.Persist(ctx, userid)
@@ -234,11 +234,12 @@ func (c *Cache) List(ctx context.Context, userid string) (map[string]SpaceShareI
}
r := map[string]SpaceShareIDs{}
if c.UserShares[userid] == nil {
us, ok := c.UserShares.Load(userid)
if !ok {
return r, nil
}
for ssid, cached := range c.UserShares[userid].UserShares {
for ssid, cached := range us.UserShares {
r[ssid] = SpaceShareIDs{
IDs: cached.IDs,
}
@@ -261,8 +262,8 @@ func (c *Cache) syncWithLock(ctx context.Context, userID string) error {
dlreq := metadata.DownloadRequest{
Path: userCreatedPath,
}
if c.UserShares[userID].Etag != "" {
dlreq.IfNoneMatch = []string{c.UserShares[userID].Etag}
if us, ok := c.UserShares.Load(userID); ok && us.Etag != "" {
dlreq.IfNoneMatch = []string{us.Etag}
}
dlres, err := c.storage.Download(ctx, dlreq)
@@ -290,7 +291,7 @@ func (c *Cache) syncWithLock(ctx context.Context, userID string) error {
}
newShareCache.Etag = dlres.Etag
c.UserShares[userID] = newShareCache
c.UserShares.Store(userID, newShareCache)
span.SetStatus(codes.Ok, "")
return nil
}
@@ -301,7 +302,12 @@ func (c *Cache) Persist(ctx context.Context, userid string) error {
defer span.End()
span.SetAttributes(attribute.String("cs3.userid", userid))
createdBytes, err := json.Marshal(c.UserShares[userid])
us, ok := c.UserShares.Load(userid)
if !ok {
span.SetStatus(codes.Ok, "no user shares")
return nil
}
createdBytes, err := json.Marshal(us)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
@@ -317,11 +323,11 @@ func (c *Cache) Persist(ctx context.Context, userid string) error {
ur := metadata.UploadRequest{
Path: jsonPath,
Content: createdBytes,
IfMatchEtag: c.UserShares[userid].Etag,
IfMatchEtag: us.Etag,
}
// when there is no etag in memory make sure the file has not been created on the server, see https://www.rfc-editor.org/rfc/rfc9110#field.if-match
// > If the field value is "*", the condition is false if the origin server has a current representation for the target resource.
if c.UserShares[userid].Etag == "" {
if us.Etag == "" {
ur.IfNoneMatch = []string{"*"}
}
res, err := c.storage.Upload(ctx, ur)
@@ -330,7 +336,7 @@ func (c *Cache) Persist(ctx context.Context, userid string) error {
span.SetStatus(codes.Error, err.Error())
return err
}
c.UserShares[userid].Etag = res.Etag
us.Etag = res.Etag
span.SetStatus(codes.Ok, "")
return nil
}
@@ -340,13 +346,11 @@ func (c *Cache) userCreatedPath(userid string) string {
}
func (c *Cache) initializeIfNeeded(userid, ssid string) {
if c.UserShares[userid] == nil {
c.UserShares[userid] = &UserShareCache{
UserShares: map[string]*SpaceShareIDs{},
}
}
if ssid != "" && c.UserShares[userid].UserShares[ssid] == nil {
c.UserShares[userid].UserShares[ssid] = &SpaceShareIDs{
us, _ := c.UserShares.LoadOrStore(userid, &UserShareCache{
UserShares: map[string]*SpaceShareIDs{},
})
if ssid != "" && us.UserShares[ssid] == nil {
us.UserShares[ssid] = &SpaceShareIDs{
IDs: map[string]struct{}{},
}
}

View File

@@ -113,7 +113,8 @@ var responses = map[string]Response{
`POST /apps/sciencemesh/~f7fbf8c8-139b-4376-b307-cf0a8c2d0d9c/api/storage/GetMD {"ref":{"path":"/file"},"mdKeys":null}`: {404, ``, serverStateEmpty},
`POST /apps/sciencemesh/~f7fbf8c8-139b-4376-b307-cf0a8c2d0d9c/api/storage/InitiateUpload {"ref":{"path":"/file"},"uploadLength":0,"metadata":{"providerID":""}}`: {200, `{"simple": "yes","tus": "yes"}`, serverStateEmpty},
`POST /apps/sciencemesh/~f7fbf8c8-139b-4376-b307-cf0a8c2d0d9c/api/storage/InitiateUpload {"ref":{"resource_id":{"storage_id":"f7fbf8c8-139b-4376-b307-cf0a8c2d0d9c"},"path":"/versionedFile"},"uploadLength":0,"metadata":{}}`: {200, `{"simple": "yes","tus": "yes"}`, serverStateEmpty},
`POST /apps/sciencemesh/~f7fbf8c8-139b-4376-b307-cf0a8c2d0d9c/api/storage/InitiateUpload {"ref":{"resource_id":{"storage_id":"f7fbf8c8-139b-4376-b307-cf0a8c2d0d9c"},"path":"/versionedFile"},"uploadLength":1,"metadata":{}}`: {200, `{"simple": "yes","tus": "yes"}`, serverStateEmpty},
`POST /apps/sciencemesh/~f7fbf8c8-139b-4376-b307-cf0a8c2d0d9c/api/storage/InitiateUpload {"ref":{"resource_id":{"storage_id":"f7fbf8c8-139b-4376-b307-cf0a8c2d0d9c"},"path":"/versionedFile"},"uploadLength":2,"metadata":{}}`: {200, `{"simple": "yes","tus": "yes"}`, serverStateEmpty},
`POST /apps/sciencemesh/~f7fbf8c8-139b-4376-b307-cf0a8c2d0d9c/api/storage/GetMD {"ref":{"path":"/yes"},"mdKeys":[]}`: {200, `{"opaque":{},"type":1,"id":{"opaque_id":"fileid-/yes"},"checksum":{},"etag":"deadbeef","mime_type":"text/plain","mtime":{"seconds":1234567890},"path":"/yes","permission_set":{},"size":1,"canonical_metadata":{},"arbitrary_metadata":{}}`, serverStateEmpty},

View File

@@ -122,7 +122,7 @@ type SessionStore interface {
New(ctx context.Context) *upload.OcisSession
List(ctx context.Context) ([]*upload.OcisSession, error)
Get(ctx context.Context, id string) (*upload.OcisSession, error)
Cleanup(ctx context.Context, session upload.Session, failure bool, keepUpload bool)
Cleanup(ctx context.Context, session upload.Session, revertNodeMetadata, keepUpload, unmarkPostprocessing bool)
}
// Decomposedfs provides the base for decomposed filesystem implementations
@@ -281,7 +281,7 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) {
}
if !n.Exists {
log.Debug().Str("uploadID", ev.UploadID).Str("nodeID", session.NodeID()).Msg("node no longer exists")
fs.sessionStore.Cleanup(ctx, session, false, false)
fs.sessionStore.Cleanup(ctx, session, false, false, false)
continue
}
@@ -289,6 +289,7 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) {
failed bool
keepUpload bool
)
unmarkPostprocessing := true
switch ev.Outcome {
default:
@@ -301,8 +302,10 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) {
case events.PPOutcomeContinue:
if err := session.Finalize(); err != nil {
log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("could not finalize upload")
keepUpload = true // should we keep the upload when assembling failed?
failed = true
keepUpload = true
// keep postprocessing status so the upload is not deleted during housekeeping
unmarkPostprocessing = false
} else {
metrics.UploadSessionsFinalized.Inc()
}
@@ -334,7 +337,7 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) {
}
}
fs.sessionStore.Cleanup(ctx, session, failed, keepUpload)
fs.sessionStore.Cleanup(ctx, session, failed, keepUpload, unmarkPostprocessing)
// remove cache entry in gateway
fs.cache.RemoveStatContext(ctx, ev.ExecutingUser.GetId(), &provider.ResourceId{SpaceId: n.SpaceID, OpaqueId: n.ID})

View File

@@ -263,10 +263,6 @@ func (fs *Decomposedfs) RestoreRevision(ctx context.Context, ref *provider.Refer
// copy blob metadata from restored revision to node
restoredRevisionPath := fs.lu.InternalPath(spaceID, revisionKey)
err = fs.lu.CopyMetadata(ctx, restoredRevisionPath, nodePath, func(attributeName string, value []byte) (newValue []byte, copy bool) {
if attributeName == prefixes.MTimeAttr {
// update mtime
return []byte(time.Now().UTC().Format(time.RFC3339Nano)), true
}
return value, strings.HasPrefix(attributeName, prefixes.ChecksumPrefix) ||
attributeName == prefixes.TypeAttr ||
attributeName == prefixes.BlobIDAttr ||
@@ -275,6 +271,15 @@ func (fs *Decomposedfs) RestoreRevision(ctx context.Context, ref *provider.Refer
if err != nil {
return errtypes.InternalError("failed to copy blob xattrs to old revision to node: " + err.Error())
}
// always set the node mtime to the current time
err = fs.lu.MetadataBackend().SetMultiple(ctx, nodePath,
map[string][]byte{
prefixes.MTimeAttr: []byte(time.Now().UTC().Format(time.RFC3339Nano)),
},
false)
if err != nil {
return errtypes.InternalError("failed to set mtime attribute on node: " + err.Error())
}
revisionSize, err := fs.lu.MetadataBackend().GetInt64(ctx, restoredRevisionPath, prefixes.BlobsizeAttr)
if err != nil {

View File

@@ -152,6 +152,11 @@ func (t *Tree) TouchFile(ctx context.Context, n *node.Node, markprocessing bool,
if err := n.SetMtimeString(ctx, mtime); err != nil {
return errors.Wrap(err, "Decomposedfs: could not set mtime")
}
} else {
now := time.Now()
if err := n.SetMtime(ctx, &now); err != nil {
return errors.Wrap(err, "Decomposedfs: could not set mtime")
}
}
err = n.SetXattrsWithContext(ctx, attributes, true)
if err != nil {

View File

@@ -299,6 +299,14 @@ func (fs *Decomposedfs) InitiateUpload(ctx context.Context, ref *provider.Refere
metrics.UploadSessionsInitiated.Inc()
if uploadLength == 0 {
// Directly finish this upload
err = session.FinishUpload(ctx)
if err != nil {
return nil, err
}
}
return map[string]string{
"simple": session.ID(),
"tus": session.ID(),

View File

@@ -150,25 +150,27 @@ type Session interface {
ID() string
Node(ctx context.Context) (*node.Node, error)
Context(ctx context.Context) context.Context
Cleanup(cleanNode, cleanBin, cleanInfo bool)
Cleanup(revertNodeMetadata, cleanBin, cleanInfo bool)
}
// Cleanup cleans upload metadata, binary data and processing status as necessary
func (store OcisStore) Cleanup(ctx context.Context, session Session, failure bool, keepUpload bool) {
func (store OcisStore) Cleanup(ctx context.Context, session Session, revertNodeMetadata, keepUpload, unmarkPostprocessing bool) {
ctx, span := tracer.Start(session.Context(ctx), "Cleanup")
defer span.End()
session.Cleanup(failure, !keepUpload, !keepUpload)
session.Cleanup(revertNodeMetadata, !keepUpload, !keepUpload)
// unset processing status
n, err := session.Node(ctx)
if err != nil {
appctx.GetLogger(ctx).Info().Str("session", session.ID()).Err(err).Msg("could not read node")
return
}
// FIXME: after cleanup the node might already be deleted ...
if n != nil { // node can be nil when there was an error before it was created (eg. checksum-mismatch)
if err := n.UnmarkProcessing(ctx, session.ID()); err != nil {
appctx.GetLogger(ctx).Info().Str("path", n.InternalPath()).Err(err).Msg("unmarking processing failed")
if unmarkPostprocessing {
n, err := session.Node(ctx)
if err != nil {
appctx.GetLogger(ctx).Info().Str("session", session.ID()).Err(err).Msg("could not read node")
return
}
// FIXME: after cleanup the node might already be deleted ...
if n != nil { // node can be nil when there was an error before it was created (eg. checksum-mismatch)
if err := n.UnmarkProcessing(ctx, session.ID()); err != nil {
appctx.GetLogger(ctx).Info().Str("path", n.InternalPath()).Err(err).Msg("unmarking processing failed")
}
}
}
}

View File

@@ -177,7 +177,7 @@ func (session *OcisSession) FinishUpload(ctx context.Context) error {
err = errtypes.BadRequest("unsupported checksum algorithm: " + parts[0])
}
if err != nil {
session.store.Cleanup(ctx, session, true, false)
session.store.Cleanup(ctx, session, true, false, false)
return err
}
}
@@ -191,7 +191,7 @@ func (session *OcisSession) FinishUpload(ctx context.Context) error {
n, err := session.store.CreateNodeForUpload(session, attrs)
if err != nil {
session.store.Cleanup(ctx, session, true, false)
session.store.Cleanup(ctx, session, true, false, true)
return err
}
@@ -223,7 +223,7 @@ func (session *OcisSession) FinishUpload(ctx context.Context) error {
if !session.store.async {
// handle postprocessing synchronously
err = session.Finalize()
session.store.Cleanup(ctx, session, err != nil, false)
session.store.Cleanup(ctx, session, err != nil, false, err == nil)
if err != nil {
log.Error().Err(err).Msg("failed to upload")
return err
@@ -312,10 +312,10 @@ func (session *OcisSession) removeNode(ctx context.Context) {
}
// cleanup cleans up after the upload is finished
func (session *OcisSession) Cleanup(cleanNode, cleanBin, cleanInfo bool) {
func (session *OcisSession) Cleanup(revertNodeMetadata, cleanBin, cleanInfo bool) {
ctx := session.Context(context.Background())
if cleanNode {
if revertNodeMetadata {
if session.NodeExists() {
p := session.info.MetaData["versionsPath"]
n, err := session.Node(ctx)

View File

@@ -181,6 +181,8 @@ func (cs3 *CS3) Upload(ctx context.Context, req UploadRequest) (*UploadResponse,
ifuReq.Opaque = utils.AppendPlainToOpaque(ifuReq.Opaque, "X-OC-Mtime", strconv.Itoa(int(req.MTime.Unix()))+"."+strconv.Itoa(req.MTime.Nanosecond()))
}
ifuReq.Opaque = utils.AppendPlainToOpaque(ifuReq.Opaque, net.HeaderUploadLength, strconv.FormatInt(int64(len(req.Content)), 10))
res, err := client.InitiateFileUpload(ctx, ifuReq)
if err != nil {
return nil, err

2
vendor/modules.txt vendored
View File

@@ -362,7 +362,7 @@ github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1
github.com/cs3org/go-cs3apis/cs3/storage/registry/v1beta1
github.com/cs3org/go-cs3apis/cs3/tx/v1beta1
github.com/cs3org/go-cs3apis/cs3/types/v1beta1
# github.com/cs3org/reva/v2 v2.18.1-0.20240104084554-e85441869c2b
# github.com/cs3org/reva/v2 v2.18.1-0.20240115094008-bde86a38bd77
## explicit; go 1.21
github.com/cs3org/reva/v2/cmd/revad/internal/grace
github.com/cs3org/reva/v2/cmd/revad/runtime