Merge pull request #7083 from owncloud/release-4.0.0

[full-ci] Prepare Release 4.0.0
This commit is contained in:
kobergj
2023-08-25 08:41:22 +02:00
committed by GitHub
108 changed files with 1509 additions and 682 deletions

View File

@@ -1,4 +1,4 @@
Enhancement: Bump reva to latest edge
Enhancement: Bump reva to 2.16.0
* Bugfix [cs3org/reva#4086](https://github.com/cs3org/reva/pull/4086): Fix ocs status code for not enough permission response
* Bugfix [cs3org/reva#4078](https://github.com/cs3org/reva/pull/4078): fix the default document language for OnlyOffice
@@ -8,6 +8,22 @@ Enhancement: Bump reva to latest edge
* Bugfix [cs3org/reva#4076](https://github.com/cs3org/reva/pull/4076): Fix WebDAV permissions for space managers
* Bugfix [cs3org/reva#4078](https://github.com/cs3org/reva/pull/4078): fix the default document language for OnlyOffice
* Bugfix [cs3org/reva#4081](https://github.com/cs3org/reva/pull/4081): Propagate sizeDiff
* Bugfix [cs3org/reva#4051](https://github.com/cs3org/reva/pull/4051): Set treesize when creating a storage space
* Bugfix [cs3org/reva#4093](https://github.com/cs3org/reva/pull/4093): Fix the error handling
* Bugfix [cs3org/reva#4111](https://github.com/cs3org/reva/pull/4111): Return already exists error when child already exists
* Bugfix [cs3org/reva#4086](https://github.com/cs3org/reva/pull/4086): Fix ocs status code for not enough permission response
* Bugfix [cs3org/reva#4101](https://github.com/cs3org/reva/pull/4101): Make the jsoncs3 share manager indexes more robust
* Bugfix [cs3org/reva#4099](https://github.com/cs3org/reva/pull/4099): Fix logging upload errors
* Bugfix [cs3org/reva#4078](https://github.com/cs3org/reva/pull/4078): Fix the default document language for OnlyOffice
* Bugfix [cs3org/reva#4082](https://github.com/cs3org/reva/pull/4082): Fix propfind permissions
* Bugfix [cs3org/reva#4100](https://github.com/cs3org/reva/pull/4100): S3ng include md5 checksum on put
* Bugfix [cs3org/reva#4096](https://github.com/cs3org/reva/pull/4096): Fix the user shares list
* Bugfix [cs3org/reva#4076](https://github.com/cs3org/reva/pull/4076): Fix WebDAV permissions for space managers
* Bugfix [cs3org/reva#4117](https://github.com/cs3org/reva/pull/4117): Fix jsoncs3 atomic persistence
* Bugfix [cs3org/reva#4081](https://github.com/cs3org/reva/pull/4081): Propagate sizeDiff
* Bugfix [cs3org/reva#4091](https://github.com/cs3org/reva/pull/4091): Register WebDAV HTTP methods with chi
* Bugfix [cs3org/reva#4107](https://github.com/cs3org/reva/pull/4107): Return lock when requested
* Bugfix [cs3org/reva#4075](https://github.com/cs3org/reva/pull/4075): Revert 4065 - bypass proxy on upload
* Enhancement [cs3org/reva#4070](https://github.com/cs3org/reva/pull/4070): Selectable Propagators
* Enhancement [cs3org/reva#4074](https://github.com/cs3org/reva/pull/4074): Allow configuring the max size of grpc messages
* Enhancement [cs3org/reva#4085](https://github.com/cs3org/reva/pull/4085): Add registry refresh
@@ -15,10 +31,22 @@ Enhancement: Bump reva to latest edge
* Enhancement [cs3org/reva#4072](https://github.com/cs3org/reva/pull/4072): Allow to specify a shutdown timeout
* Enhancement [cs3org/reva#4083](https://github.com/cs3org/reva/pull/4083): Allow for rolling back migrations
* Enhancement [cs3org/reva#4014](https://github.com/cs3org/reva/pull/4014): En-/Disable DEPTH:inifinity in PROPFIND
* Enhancement [cs3org/reva#4089](https://github.com/cs3org/reva/pull/4089): Async propagation (experimental)
* Enhancement [cs3org/reva#4074](https://github.com/cs3org/reva/pull/4074): Allow configuring the max size of grpc messages
* Enhancement [cs3org/reva#4083](https://github.com/cs3org/reva/pull/4083): Allow for rolling back migrations
* Enhancement [cs3org/reva#4014](https://github.com/cs3org/reva/pull/4014): En-/Disable DEPTH:inifinity in PROPFIND
* Enhancement [cs3org/reva#4072](https://github.com/cs3org/reva/pull/4072): Allow to specify a shutdown timeout
* Enhancement [cs3org/reva#4103](https://github.com/cs3org/reva/pull/4103): Add .oform mimetype
* Enhancement [cs3org/reva#4098](https://github.com/cs3org/reva/pull/4098): Allow naming nats connections
* Enhancement [cs3org/reva#4085](https://github.com/cs3org/reva/pull/4085): Add registry refresh
* Enhancement [cs3org/reva#4097](https://github.com/cs3org/reva/pull/4097): Remove app ticker logs
* Enhancement [cs3org/reva#4090](https://github.com/cs3org/reva/pull/4090): Add Capability for sse
* Enhancement [cs3org/reva#4110](https://github.com/cs3org/reva/pull/4110): Tracing events propgation
https://github.com/owncloud/ocis/pull/6899
https://github.com/owncloud/ocis/pull/6919
https://github.com/owncloud/ocis/pull/6928
https://github.com/owncloud/ocis/pull/6979
Update reva to v2.15.0

View File

@@ -1,5 +0,0 @@
Enhancement: Bump reva to latest edge
bumps reva to latest edge
https://github.com/owncloud/ocis/pull/6979

View File

@@ -31,7 +31,7 @@
env:
INSECURE: "false"
TRAEFIK_ACME_MAIL: wkloucek@owncloud.com
OCIS_DOCKER_TAG: 3.1.0-rc.1
OCIS_DOCKER_TAG: 4.0.0
OCIS_DOMAIN: ocis.ocis-keycloak.released.owncloud.works
KEYCLOAK_DOMAIN: keycloak.ocis-keycloak.released.owncloud.works
COMPOSE_FILE: docker-compose.yml:monitoring_tracing/docker-compose-additions.yml

View File

@@ -31,7 +31,7 @@
env:
INSECURE: "false"
TRAEFIK_ACME_MAIL: wkloucek@owncloud.com
OCIS_DOCKER_TAG: 3.1.0-rc.1
OCIS_DOCKER_TAG: 4.0.0
OCIS_DOMAIN: ocis.ocis-ldap.released.owncloud.works
LDAP_MANAGER_DOMAIN: ldap.ocis-ldap.released.owncloud.works
COMPOSE_FILE: docker-compose.yml:monitoring_tracing/docker-compose-additions.yml

View File

@@ -31,7 +31,7 @@
env:
INSECURE: "false"
TRAEFIK_ACME_MAIL: wkloucek@owncloud.com
OCIS_DOCKER_TAG: 3.1.0-rc.1
OCIS_DOCKER_TAG: 4.0.0
OCIS_DOMAIN: ocis.ocis-traefik.released.owncloud.works
DEMO_USERS: "true"
INBUCKET_DOMAIN: mail.ocis-traefik.released.owncloud.works

View File

@@ -31,7 +31,7 @@
env:
INSECURE: "false"
TRAEFIK_ACME_MAIL: wkloucek@owncloud.com
OCIS_DOCKER_TAG: 3.1.0-rc.1
OCIS_DOCKER_TAG: 4.0.0
OCIS_DOMAIN: ocis.ocis-wopi.released.owncloud.works
WOPISERVER_DOMAIN: wopiserver.ocis-wopi.released.owncloud.works
COLLABORA_DOMAIN: collabora.ocis-wopi.released.owncloud.works

2
go.mod
View File

@@ -13,7 +13,7 @@ require (
github.com/coreos/go-oidc v2.2.1+incompatible
github.com/coreos/go-oidc/v3 v3.6.0
github.com/cs3org/go-cs3apis v0.0.0-20230516150832-730ac860c71d
github.com/cs3org/reva/v2 v2.15.1-0.20230816081257-e3a2be91bc4f
github.com/cs3org/reva/v2 v2.16.0
github.com/disintegration/imaging v1.6.2
github.com/dutchcoders/go-clamd v0.0.0-20170520113014-b970184f4d9e
github.com/egirna/icap-client v0.1.1

2
go.sum
View File

@@ -866,6 +866,8 @@ github.com/crewjam/saml v0.4.13 h1:TYHggH/hwP7eArqiXSJUvtOPNzQDyQ7vwmwEqlFWhMc=
github.com/crewjam/saml v0.4.13/go.mod h1:igEejV+fihTIlHXYP8zOec3V5A8y3lws5bQBFsTm4gA=
github.com/cs3org/reva/v2 v2.15.1-0.20230816081257-e3a2be91bc4f h1:s0sBJbIB8atyhujVx/OaadujuRHer8ODPpWxyGWfw/s=
github.com/cs3org/reva/v2 v2.15.1-0.20230816081257-e3a2be91bc4f/go.mod h1:6GyXffmxluCqQxXaYuVC2Dg10gj0QW199iVlxV0EAJg=
github.com/cs3org/reva/v2 v2.16.0 h1:XBFoGhzKrcDqqRvYdicOpDOpQVIsRqALmEt8X0N+wm0=
github.com/cs3org/reva/v2 v2.16.0/go.mod h1:RvhuweTFqzezjUFU0SIdTXakrEx9vJlMvQ7znPXSP1g=
github.com/cyberdelia/templates v0.0.0-20141128023046-ca7fffd4298c/go.mod h1:GyV+0YP4qX0UQ7r2MoYZ+AvYDp12OF5yg4q8rGnyNh4=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=

View File

@@ -16,7 +16,7 @@ var (
// LatestTag is the latest released version plus the dev meta version.
// Will be overwritten by the release pipeline
// Needs a manual change for every tagged release
LatestTag = "3.1.0-rc.1+dev"
LatestTag = "4.0.0+dev"
// Date indicates the build date.
// This has been removed, it looks like you can only replace static strings with recent go versions

View File

@@ -2,7 +2,7 @@
sonar.projectKey=owncloud_ocis
sonar.organization=owncloud-1
sonar.projectName=ocis
sonar.projectVersion=3.1.0-rc.1
sonar.projectVersion=4.0.0
sonar.host.url=https://sonarcloud.io
# =====================================================

View File

@@ -52,6 +52,13 @@ func (s *svc) RemoveShare(ctx context.Context, req *collaboration.RemoveShareReq
return s.removeShare(ctx, req)
}
func (s *svc) UpdateShare(ctx context.Context, req *collaboration.UpdateShareRequest) (*collaboration.UpdateShareResponse, error) {
if !s.c.UseCommonSpaceRootShareLogic && refIsSpaceRoot(req.GetShare().GetResourceId()) {
return s.updateSpaceShare(ctx, req)
}
return s.updateShare(ctx, req)
}
// TODO(labkode): we need to validate share state vs storage grant and storage ref
// If there are any inconsistencies, the share needs to be flag as invalid and a background process
// or active fix needs to be performed.
@@ -98,7 +105,7 @@ func (s *svc) ListShares(ctx context.Context, req *collaboration.ListSharesReque
return res, nil
}
func (s *svc) UpdateShare(ctx context.Context, req *collaboration.UpdateShareRequest) (*collaboration.UpdateShareResponse, error) {
func (s *svc) updateShare(ctx context.Context, req *collaboration.UpdateShareRequest) (*collaboration.UpdateShareResponse, error) {
c, err := pool.GetUserShareProviderClient(s.c.UserShareProviderEndpoint)
if err != nil {
appctx.GetLogger(ctx).
@@ -114,9 +121,14 @@ func (s *svc) UpdateShare(ctx context.Context, req *collaboration.UpdateShareReq
}
if s.c.CommitShareToStorageGrant {
updateGrantStatus, err := s.updateGrant(ctx, res.GetShare().GetResourceId(),
res.GetShare().GetGrantee(),
res.GetShare().GetPermissions().GetPermissions())
creator := ctxpkg.ContextMustGetUser(ctx)
grant := &provider.Grant{
Grantee: req.GetShare().GetGrantee(),
Permissions: req.GetShare().GetPermissions().GetPermissions(),
Expiration: req.GetShare().GetExpiration(),
Creator: creator.GetId(),
}
updateGrantStatus, err := s.updateGrant(ctx, res.GetShare().GetResourceId(), grant, nil)
if err != nil {
return nil, errors.Wrap(err, "gateway: error calling updateGrant")
@@ -134,6 +146,51 @@ func (s *svc) UpdateShare(ctx context.Context, req *collaboration.UpdateShareReq
return res, nil
}
func (s *svc) updateSpaceShare(ctx context.Context, req *collaboration.UpdateShareRequest) (*collaboration.UpdateShareResponse, error) {
// If the share is a denial we call denyGrant instead.
var st *rpc.Status
var err error
// TODO: change CS3 APIs
opaque := &typesv1beta1.Opaque{
Map: map[string]*typesv1beta1.OpaqueEntry{
"spacegrant": {},
},
}
utils.AppendPlainToOpaque(opaque, "spacetype", utils.ReadPlainFromOpaque(req.Opaque, "spacetype"))
creator := ctxpkg.ContextMustGetUser(ctx)
grant := &provider.Grant{
Grantee: req.GetShare().GetGrantee(),
Permissions: req.GetShare().GetPermissions().GetPermissions(),
Expiration: req.GetShare().GetExpiration(),
Creator: creator.GetId(),
}
if grants.PermissionsEqual(req.Share.GetPermissions().GetPermissions(), &provider.ResourcePermissions{}) {
st, err = s.denyGrant(ctx, req.GetShare().GetResourceId(), req.GetShare().GetGrantee(), opaque)
if err != nil {
return nil, errors.Wrap(err, "gateway: error denying grant in storage")
}
} else {
st, err = s.updateGrant(ctx, req.GetShare().GetResourceId(), grant, opaque)
if err != nil {
return nil, errors.Wrap(err, "gateway: error adding grant to storage")
}
}
res := &collaboration.UpdateShareResponse{
Status: st,
Share: req.Share,
}
if st.Code != rpc.Code_CODE_OK {
return res, nil
}
s.statCache.RemoveStatContext(ctx, ctxpkg.ContextMustGetUser(ctx).GetId(), req.GetShare().GetResourceId())
s.providerCache.RemoveListStorageProviders(req.GetShare().GetResourceId())
return res, nil
}
// TODO(labkode): listing received shares just goes to the user share manager and gets the list of
// received shares. The display name of the shares should be the a friendly name, like the basename
// of the original file.
@@ -333,19 +390,15 @@ func (s *svc) addGrant(ctx context.Context, id *provider.ResourceId, g *provider
return grantRes.Status, nil
}
func (s *svc) updateGrant(ctx context.Context, id *provider.ResourceId, g *provider.Grantee, p *provider.ResourcePermissions) (*rpc.Status, error) {
func (s *svc) updateGrant(ctx context.Context, id *provider.ResourceId, grant *provider.Grant, opaque *typesv1beta1.Opaque) (*rpc.Status, error) {
ref := &provider.Reference{
ResourceId: id,
}
creator := ctxpkg.ContextMustGetUser(ctx)
grantReq := &provider.UpdateGrantRequest{
Ref: ref,
Grant: &provider.Grant{
Grantee: g,
Permissions: p,
Creator: creator.GetId(),
},
Opaque: opaque,
Ref: ref,
Grant: grant,
}
c, _, err := s.find(ctx, ref)

View File

@@ -324,7 +324,13 @@ func (s *service) InitiateFileUpload(ctx context.Context, req *provider.Initiate
return nil, err
}
switch sRes.Status.Code {
case rpc.Code_CODE_OK, rpc.Code_CODE_NOT_FOUND:
case rpc.Code_CODE_OK:
if req.GetIfNotExist() {
return &provider.InitiateFileUploadResponse{
Status: status.NewAlreadyExists(ctx, errors.New("already exists"), "already exists"),
}, nil
}
case rpc.Code_CODE_NOT_FOUND:
// Just continue with a normal upload
default:
return &provider.InitiateFileUploadResponse{
@@ -342,10 +348,14 @@ func (s *service) InitiateFileUpload(ctx context.Context, req *provider.Initiate
}
metadata["if-match"] = ifMatch
}
if !validateIfUnmodifiedSince(req.GetIfUnmodifiedSince(), sRes.GetInfo()) {
return &provider.InitiateFileUploadResponse{
Status: status.NewFailedPrecondition(ctx, errors.New("resource has been modified"), "resource has been modified"),
}, nil
ifUnmodifiedSince := req.GetIfUnmodifiedSince()
if ifUnmodifiedSince != nil {
metadata["if-unmodified-since"] = utils.TSToTime(ifUnmodifiedSince).Format(time.RFC3339Nano)
if !validateIfUnmodifiedSince(ifUnmodifiedSince, sRes.GetInfo()) {
return &provider.InitiateFileUploadResponse{
Status: status.NewFailedPrecondition(ctx, errors.New("resource has been modified"), "resource has been modified"),
}, nil
}
}
ctx = ctxpkg.ContextSetLockID(ctx, req.LockId)
@@ -1080,6 +1090,19 @@ func (s *service) UpdateGrant(ctx context.Context, req *provider.UpdateGrantRequ
}
}
// TODO: update CS3 APIs
// FIXME these should be part of the AddGrantRequest object
// https://github.com/owncloud/ocis/issues/4312
if utils.ExistsInOpaque(req.Opaque, "spacegrant") {
ctx = context.WithValue(
ctx,
utils.SpaceGrant,
struct{ SpaceType string }{
SpaceType: utils.ReadPlainFromOpaque(req.Opaque, "spacetype"),
},
)
}
// check grantee type is valid
if req.Grant.Grantee.Type == provider.GranteeType_GRANTEE_TYPE_INVALID {
return &provider.UpdateGrantResponse{

View File

@@ -33,6 +33,7 @@ const (
HeaderLocation = "Location"
HeaderRange = "Range"
HeaderIfMatch = "If-Match"
HeaderIfNoneMatch = "If-None-Match"
HeaderPrefer = "Prefer"
HeaderPreferenceApplied = "Preference-Applied"
HeaderVary = "Vary"

View File

@@ -39,6 +39,7 @@ import (
"github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool"
sdk "github.com/cs3org/reva/v2/pkg/sdk/common"
"github.com/cs3org/reva/v2/pkg/storagespace"
"github.com/cs3org/reva/v2/pkg/utils"
"github.com/pkg/errors"
)
@@ -126,47 +127,72 @@ func (h *Handler) addSpaceMember(w http.ResponseWriter, r *http.Request, info *p
}
}
if role.Name != conversions.RoleManager {
ref := provider.Reference{ResourceId: info.GetId()}
p, err := h.findProvider(ctx, &ref)
if err != nil {
response.WriteOCSError(w, r, response.MetaNotFound.StatusCode, "error getting storage provider", err)
return
}
providerClient, err := h.getStorageProviderClient(p)
if err != nil {
response.WriteOCSError(w, r, response.MetaNotFound.StatusCode, "error getting storage provider client", err)
return
}
lgRes, err := providerClient.ListGrants(ctx, &provider.ListGrantsRequest{Ref: &ref})
if err != nil || lgRes.Status.Code != rpc.Code_CODE_OK {
response.WriteOCSError(w, r, response.MetaServerError.StatusCode, "error listing space grants", err)
return
}
if !isSpaceManagerRemaining(lgRes.Grants, grantee) {
response.WriteOCSError(w, r, http.StatusForbidden, "the space must have at least one manager", nil)
return
}
ref := provider.Reference{ResourceId: info.GetId()}
p, err := h.findProvider(ctx, &ref)
if err != nil {
response.WriteOCSError(w, r, response.MetaNotFound.StatusCode, "error getting storage provider", err)
return
}
createShareRes, err := client.CreateShare(ctx, &collaborationv1beta1.CreateShareRequest{
ResourceInfo: info,
Grant: &collaborationv1beta1.ShareGrant{
Permissions: &collaborationv1beta1.SharePermissions{
Permissions: permissions,
},
Grantee: &grantee,
Expiration: expirationTs,
},
})
if err != nil || createShareRes.Status.Code != rpc.Code_CODE_OK {
response.WriteOCSError(w, r, response.MetaServerError.StatusCode, "could not add space member", err)
providerClient, err := h.getStorageProviderClient(p)
if err != nil {
response.WriteOCSError(w, r, response.MetaNotFound.StatusCode, "error getting storage provider client", err)
return
}
lgRes, err := providerClient.ListGrants(ctx, &provider.ListGrantsRequest{Ref: &ref})
if err != nil || lgRes.Status.Code != rpc.Code_CODE_OK {
response.WriteOCSError(w, r, response.MetaServerError.StatusCode, "error listing space grants", err)
return
}
if !isSpaceManagerRemaining(lgRes.Grants, grantee) {
response.WriteOCSError(w, r, http.StatusForbidden, "the space must have at least one manager", nil)
return
}
// we have to send the update request to the gateway to give it a chance to invalidate its cache
// TODO the gateway no longer should cache stuff because invalidation is to expensive. The decomposedfs already has a better cache.
if granteeExists(lgRes.Grants, grantee) {
updateShareReq := &collaborationv1beta1.UpdateShareRequest{
// TODO: change CS3 APIs
Opaque: &types.Opaque{
Map: map[string]*types.OpaqueEntry{
"spacegrant": {},
},
},
Share: &collaborationv1beta1.Share{
ResourceId: ref.GetResourceId(),
Permissions: &collaborationv1beta1.SharePermissions{
Permissions: permissions,
},
Grantee: &grantee,
Expiration: expirationTs,
},
}
updateShareReq.Opaque = utils.AppendPlainToOpaque(updateShareReq.Opaque, "spacetype", info.GetSpace().GetSpaceType())
updateShareRes, err := client.UpdateShare(ctx, updateShareReq)
if err != nil || updateShareRes.Status.Code != rpc.Code_CODE_OK {
response.WriteOCSError(w, r, response.MetaServerError.StatusCode, "could not update space member grant", err)
return
}
} else {
createShareRes, err := client.CreateShare(ctx, &collaborationv1beta1.CreateShareRequest{
ResourceInfo: info,
Grant: &collaborationv1beta1.ShareGrant{
Permissions: &collaborationv1beta1.SharePermissions{
Permissions: permissions,
},
Grantee: &grantee,
Expiration: expirationTs,
},
})
if err != nil || createShareRes.Status.Code != rpc.Code_CODE_OK {
response.WriteOCSError(w, r, response.MetaServerError.StatusCode, "could not add space member grant", err)
return
}
}
response.WriteOCSSuccess(w, r, nil)
}
@@ -325,6 +351,15 @@ func isSpaceManagerRemaining(grants []*provider.Grant, grantee provider.Grantee)
return false
}
func granteeExists(grants []*provider.Grant, grantee provider.Grantee) bool {
for _, g := range grants {
if isEqualGrantee(*g.Grantee, grantee) {
return true
}
}
return false
}
func isEqualGrantee(a, b provider.Grantee) bool {
// Ideally we would want to use utils.GranteeEqual()
// but the grants stored in the decomposedfs aren't complete (missing usertype and idp)

View File

@@ -23,6 +23,7 @@
package errtypes
import (
"net/http"
"strings"
rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
@@ -172,6 +173,19 @@ func (e InsufficientStorage) StatusCode() int {
return StatusInsufficientStorage
}
// NotModified is the error to use when a resource was not modified, e.g. the requested etag did not change.
type NotModified string
func (e NotModified) Error() string { return "error: not modified: " + string(e) }
// NotModified implements the IsNotModified interface.
func (e NotModified) IsNotModified() {}
// StatusCode returns StatusInsufficientStorage, this implementation is needed to allow TUS to cast the correct http errors.
func (e NotModified) StatusCode() int {
return http.StatusNotModified
}
// Body returns the error body. This implementation is needed to allow TUS to cast the correct http errors
func (e InsufficientStorage) Body() []byte {
return []byte(e.Error())
@@ -302,3 +316,37 @@ func NewErrtypeFromStatus(status *rpc.Status) error {
return InternalError(status.Message)
}
}
// NewErrtypeFromHTTPStatusCode maps an http status to an errtype
func NewErrtypeFromHTTPStatusCode(code int, message string) error {
switch code {
case http.StatusOK:
return nil
case http.StatusNotFound:
return NotFound(message)
case http.StatusConflict:
return AlreadyExists(message)
case http.StatusNotImplemented:
return NotSupported(message)
case http.StatusNotModified:
return NotModified(message)
case http.StatusForbidden:
return PermissionDenied(message)
case http.StatusLocked:
return Locked(message)
case http.StatusPreconditionFailed:
return Aborted(message)
case http.StatusMethodNotAllowed:
return PreconditionFailed(message)
case http.StatusInsufficientStorage:
return InsufficientStorage(message)
case http.StatusBadRequest:
return BadRequest(message)
case http.StatusPartialContent:
return PartialContent(message)
case StatusChecksumMismatch:
return ChecksumMismatch(message)
default:
return InternalError(message)
}
}

View File

@@ -102,9 +102,10 @@ func (p *cs3) Write(ctx context.Context, db persistence.PublicShares) error {
return err
}
return p.s.Upload(ctx, metadata.UploadRequest{
_, err = p.s.Upload(ctx, metadata.UploadRequest{
Content: dbAsJSON,
Path: "publicshares.json",
IfUnmodifiedSince: p.db.mtime,
})
return err
}

View File

@@ -129,7 +129,7 @@ func (m *manager) Handler(fs storage.FS) (http.Handler, error) {
w.WriteHeader(http.StatusUnauthorized)
case errtypes.InsufficientStorage:
w.WriteHeader(http.StatusInsufficientStorage)
case errtypes.PreconditionFailed, errtypes.Aborted:
case errtypes.PreconditionFailed, errtypes.Aborted, errtypes.AlreadyExists:
w.WriteHeader(http.StatusPreconditionFailed)
default:
sublog.Error().Err(v).Msg("error uploading file")

View File

@@ -144,7 +144,7 @@ func (m *manager) Handler(fs storage.FS) (http.Handler, error) {
w.WriteHeader(http.StatusUnauthorized)
case errtypes.InsufficientStorage:
w.WriteHeader(http.StatusInsufficientStorage)
case errtypes.PreconditionFailed, errtypes.Aborted:
case errtypes.PreconditionFailed, errtypes.Aborted, errtypes.AlreadyExists:
w.WriteHeader(http.StatusPreconditionFailed)
default:
sublog.Error().Err(v).Msg("error uploading file")

View File

@@ -20,6 +20,7 @@
package download
import (
"context"
"fmt"
"io"
"mime/multipart"
@@ -39,6 +40,25 @@ import (
"github.com/rs/zerolog"
)
type contextKey struct{}
var etagKey = contextKey{}
// ContextWithEtag returns a new `context.Context` that holds an etag.
func ContextWithEtag(ctx context.Context, etag string) context.Context {
return context.WithValue(ctx, etagKey, etag)
}
// EtagFromContext returns the etag previously associated with `ctx`, or
// `""` if no such etag could be found.
func EtagFromContext(ctx context.Context) string {
val := ctx.Value(etagKey)
if etag, ok := val.(string); ok {
return etag
}
return ""
}
// GetOrHeadFile returns the requested file content
func GetOrHeadFile(w http.ResponseWriter, r *http.Request, fs storage.FS, spaceID string) {
ctx := r.Context()
@@ -75,11 +95,25 @@ func GetOrHeadFile(w http.ResponseWriter, r *http.Request, fs storage.FS, spaceI
// do a stat to set a Content-Length header
if md, err = fs.GetMD(ctx, ref, nil, []string{"size", "mimetype"}); err != nil {
if md, err = fs.GetMD(ctx, ref, nil, []string{"size", "mimetype", "etag"}); err != nil {
handleError(w, &sublog, err, "stat")
return
}
// check etag, see https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/If-None-Match
for _, etag := range r.Header.Values(net.HeaderIfNoneMatch) {
if md.Etag == etag {
// When the condition fails for GET and HEAD methods, then the server must return
// HTTP status code 304 (Not Modified). [...] Note that the server generating a
// 304 response MUST generate any of the following header fields that would have
// been sent in a 200 (OK) response to the same request:
// Cache-Control, Content-Location, Date, ETag, Expires, and Vary.
w.Header().Set(net.HeaderETag, md.Etag)
w.WriteHeader(http.StatusNotModified)
return
}
}
// fill in storage provider id if it is missing
if spaceID != "" && md.GetId().GetStorageId() == "" {
md.Id.StorageId = ref.ResourceId.StorageId
@@ -107,6 +141,7 @@ func GetOrHeadFile(w http.ResponseWriter, r *http.Request, fs storage.FS, spaceI
}
}
ctx = ContextWithEtag(ctx, md.Etag)
content, err := fs.Download(ctx, ref)
if err != nil {
handleError(w, &sublog, err, "download")

View File

@@ -92,19 +92,22 @@ import (
2. create /users/{userid}/created.json if it doesn't exist yet and add the space/share
3. create /users/{userid}/received.json or /groups/{groupid}/received.json if it doesn exist yet and add the space/share
When updating shares /storages/{storageid}/{spaceid}.json is updated accordingly. The mtime is used to invalidate in-memory caches:
When updating shares /storages/{storageid}/{spaceid}.json is updated accordingly. The etag is used to invalidate in-memory caches:
- TODO the upload is tried with an if-unmodified-since header
- TODO when if fails, the {spaceid}.json file is downloaded, the changes are reapplied and the upload is retried with the new mtime
- TODO when if fails, the {spaceid}.json file is downloaded, the changes are reapplied and the upload is retried with the new etag
When updating received shares the mountpoint and state are updated in /users/{userid}/received.json (for both user and group shares).
When reading the list of received shares the /users/{userid}/received.json file and the /groups/{groupid}/received.json files are statted.
- if the mtime changed we download the file to update the local cache
- if the etag changed we download the file to update the local cache
When reading the list of created shares the /users/{userid}/created.json file is statted
- if the mtime changed we download the file to update the local cache
- if the etag changed we download the file to update the local cache
*/
// TODO implement a channel based aggregation of sharing requests: every in memory cache should read as many share updates to a space that are available and update them all in one go
// whenever a persist operation fails we check if we can read more shares from the channel
// name is the Tracer name used to identify this instrumentation library.
const tracerName = "jsoncs3"
@@ -365,7 +368,7 @@ func (m *Manager) Share(ctx context.Context, md *provider.ResourceInfo, g *colla
func (m *Manager) getByID(ctx context.Context, id *collaboration.ShareId) (*collaboration.Share, error) {
storageID, spaceID, _ := shareid.Decode(id.OpaqueId)
share, err := m.Cache.Get(ctx, storageID, spaceID, id.OpaqueId)
share, err := m.Cache.Get(ctx, storageID, spaceID, id.OpaqueId, false)
if err != nil {
return nil, err
}
@@ -655,41 +658,98 @@ func (m *Manager) listCreatedShares(ctx context.Context, user *userv1beta1.User,
return nil, err
}
var ss []*collaboration.Share
for ssid, spaceShareIDs := range list {
storageID, spaceID, _ := shareid.Decode(ssid)
spaceShares, err := m.Cache.ListSpace(ctx, storageID, spaceID)
if err != nil {
continue
}
for shareid := range spaceShareIDs.IDs {
s := spaceShares.Shares[shareid]
if s == nil {
continue
}
if share.IsExpired(s) {
if err := m.removeShare(ctx, s); err != nil {
log.Error().Err(err).
Msg("failed to unshare expired share")
}
if err := events.Publish(ctx, m.eventStream, events.ShareExpired{
ShareOwner: s.GetOwner(),
ItemID: s.GetResourceId(),
ExpiredAt: time.Unix(int64(s.GetExpiration().GetSeconds()), int64(s.GetExpiration().GetNanos())),
GranteeUserID: s.GetGrantee().GetUserId(),
GranteeGroupID: s.GetGrantee().GetGroupId(),
}); err != nil {
log.Error().Err(err).
Msg("failed to publish share expired event")
}
continue
}
if utils.UserEqual(user.GetId(), s.GetCreator()) {
if share.MatchesFilters(s, filters) {
ss = append(ss, s)
}
numWorkers := m.MaxConcurrency
if numWorkers == 0 || len(list) < numWorkers {
numWorkers = len(list)
}
type w struct {
ssid string
ids sharecache.SpaceShareIDs
}
work := make(chan w)
results := make(chan *collaboration.Share)
g, ctx := errgroup.WithContext(ctx)
// Distribute work
g.Go(func() error {
defer close(work)
for ssid, ids := range list {
select {
case work <- w{ssid, ids}:
case <-ctx.Done():
return ctx.Err()
}
}
return nil
})
// Spawn workers that'll concurrently work the queue
for i := 0; i < numWorkers; i++ {
g.Go(func() error {
for w := range work {
storageID, spaceID, _ := shareid.Decode(w.ssid)
// fetch all shares from space with one request
_, err := m.Cache.ListSpace(ctx, storageID, spaceID)
if err != nil {
log.Error().Err(err).
Str("storageid", storageID).
Str("spaceid", spaceID).
Msg("failed to list shares in space")
continue
}
for shareID := range w.ids.IDs {
s, err := m.Cache.Get(ctx, storageID, spaceID, shareID, true)
if err != nil || s == nil {
continue
}
if share.IsExpired(s) {
if err := m.removeShare(ctx, s); err != nil {
log.Error().Err(err).
Msg("failed to unshare expired share")
}
if err := events.Publish(ctx, m.eventStream, events.ShareExpired{
ShareOwner: s.GetOwner(),
ItemID: s.GetResourceId(),
ExpiredAt: time.Unix(int64(s.GetExpiration().GetSeconds()), int64(s.GetExpiration().GetNanos())),
GranteeUserID: s.GetGrantee().GetUserId(),
GranteeGroupID: s.GetGrantee().GetGroupId(),
}); err != nil {
log.Error().Err(err).
Msg("failed to publish share expired event")
}
continue
}
if utils.UserEqual(user.GetId(), s.GetCreator()) {
if share.MatchesFilters(s, filters) {
select {
case results <- s:
case <-ctx.Done():
return ctx.Err()
}
}
}
}
}
return nil
})
}
// Wait for things to settle down, then close results chan
go func() {
_ = g.Wait() // error is checked later
close(results)
}()
ss := []*collaboration.Share{}
for n := range results {
ss = append(ss, n)
}
if err := g.Wait(); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, err
}
span.SetStatus(codes.Ok, "")
@@ -722,7 +782,6 @@ func (m *Manager) ListReceivedShares(ctx context.Context, filters []*collaborati
var ok bool
if rs, ok = ssids[ssid]; !ok {
rs = &receivedsharecache.Space{
Mtime: spaceShareIDs.Mtime,
States: make(map[string]*receivedsharecache.State, len(spaceShareIDs.IDs)),
}
ssids[ssid] = rs
@@ -785,8 +844,17 @@ func (m *Manager) ListReceivedShares(ctx context.Context, filters []*collaborati
g.Go(func() error {
for w := range work {
storageID, spaceID, _ := shareid.Decode(w.ssid)
// fetch all shares from space with one request
_, err := m.Cache.ListSpace(ctx, storageID, spaceID)
if err != nil {
log.Error().Err(err).
Str("storageid", storageID).
Str("spaceid", spaceID).
Msg("failed to list shares in space")
continue
}
for shareID, state := range w.rspace.States {
s, err := m.Cache.Get(ctx, storageID, spaceID, shareID)
s, err := m.Cache.Get(ctx, storageID, spaceID, shareID, true)
if err != nil || s == nil {
continue
}

View File

@@ -32,20 +32,26 @@ import (
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"github.com/cs3org/reva/v2/pkg/appctx"
"github.com/cs3org/reva/v2/pkg/errtypes"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/mtimesyncedcache"
"github.com/cs3org/reva/v2/pkg/storage/utils/metadata"
"github.com/cs3org/reva/v2/pkg/utils"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
"golang.org/x/exp/maps"
)
// name is the Tracer name used to identify this instrumentation library.
const tracerName = "providercache"
var tracer trace.Tracer
func init() {
tracer = otel.Tracer("github.com/cs3org/reva/pkg/share/manager/jsoncs3/providercache")
}
// Cache holds share information structured by provider and space
type Cache struct {
lockMap sync.Map
Providers map[string]*Spaces
Providers mtimesyncedcache.Map[string, *Spaces]
storage metadata.Storage
ttl time.Duration
@@ -53,14 +59,14 @@ type Cache struct {
// Spaces holds the share information for provider
type Spaces struct {
Spaces map[string]*Shares
Spaces mtimesyncedcache.Map[string, *Shares]
}
// Shares holds the share information of one space
type Shares struct {
Shares map[string]*collaboration.Share
Mtime time.Time
nextSync time.Time
Shares map[string]*collaboration.Share
Etag string
}
// UnmarshalJSON overrides the default unmarshaling
@@ -70,7 +76,6 @@ type Shares struct {
func (s *Shares) UnmarshalJSON(data []byte) error {
tmp := struct {
Shares map[string]json.RawMessage
Mtime time.Time
}{}
err := json.Unmarshal(data, &tmp)
@@ -78,7 +83,6 @@ func (s *Shares) UnmarshalJSON(data []byte) error {
return err
}
s.Mtime = tmp.Mtime
s.Shares = make(map[string]*collaboration.Share, len(tmp.Shares))
for id, genericShare := range tmp.Shares {
userShare := &collaboration.Share{
@@ -93,11 +97,20 @@ func (s *Shares) UnmarshalJSON(data []byte) error {
groupShare := &collaboration.Share{
Grantee: &provider.Grantee{Id: &provider.Grantee_GroupId{}},
}
err = json.Unmarshal(genericShare, groupShare) // try to unmarshal to a group share if the user share unmarshalling failed
if err != nil {
return err
err = json.Unmarshal(genericShare, groupShare) // is this a group share?
if err == nil && groupShare.Grantee.Type == provider.GranteeType_GRANTEE_TYPE_GROUP {
s.Shares[id] = groupShare
continue
}
s.Shares[id] = groupShare
invalidShare := &collaboration.Share{}
err = json.Unmarshal(genericShare, invalidShare) // invalid
if err == nil {
s.Shares[id] = invalidShare
continue
}
return err
}
return nil
@@ -115,26 +128,25 @@ func (c *Cache) LockSpace(spaceID string) func() {
// New returns a new Cache instance
func New(s metadata.Storage, ttl time.Duration) Cache {
return Cache{
Providers: map[string]*Spaces{},
Providers: mtimesyncedcache.Map[string, *Spaces]{},
storage: s,
ttl: ttl,
lockMap: sync.Map{},
}
}
func (c *Cache) isSpaceCached(storageID, spaceID string) bool {
spaces, ok := c.Providers.Load(storageID)
if !ok {
return false
}
_, ok = spaces.Spaces.Load(spaceID)
return ok
}
// Add adds a share to the cache
func (c *Cache) Add(ctx context.Context, storageID, spaceID, shareID string, share *collaboration.Share) error {
unlock := c.LockSpace(spaceID)
defer unlock()
if c.Providers[storageID] == nil || c.Providers[storageID].Spaces[spaceID] == nil {
err := c.syncWithLock(ctx, storageID, spaceID)
if err != nil {
return err
}
}
ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "Add")
ctx, span := tracer.Start(ctx, "Add")
defer span.End()
span.SetAttributes(attribute.String("cs3.storageid", storageID), attribute.String("cs3.spaceid", spaceID), attribute.String("cs3.shareid", shareID))
@@ -146,221 +158,320 @@ func (c *Cache) Add(ctx context.Context, storageID, spaceID, shareID string, sha
case shareID == "":
return fmt.Errorf("missing share id")
}
c.initializeIfNeeded(storageID, spaceID)
unlock := c.LockSpace(spaceID)
defer unlock()
span.AddEvent("got lock")
var err error
if !c.isSpaceCached(storageID, spaceID) {
err = c.syncWithLock(ctx, storageID, spaceID)
if err != nil {
return err
}
}
log := appctx.GetLogger(ctx).With().
Str("hostname", os.Getenv("HOSTNAME")).
Str("storageID", storageID).
Str("spaceID", spaceID).
Str("shareID", shareID).Logger()
persistFunc := func() error {
c.Providers[storageID].Spaces[spaceID].Shares[shareID] = share
spaces, _ := c.Providers.Load(storageID)
space, _ := spaces.Spaces.Load(spaceID)
log.Info().Interface("shares", maps.Keys(space.Shares)).Str("New share", shareID).Msg("Adding share to space")
space.Shares[shareID] = share
return c.Persist(ctx, storageID, spaceID)
}
err := persistFunc()
if _, ok := err.(errtypes.IsPreconditionFailed); ok {
for retries := 100; retries > 0; retries-- {
err = persistFunc()
switch err.(type) {
case nil:
span.SetStatus(codes.Ok, "")
return nil
case errtypes.Aborted:
log.Debug().Msg("aborted when persisting added provider share: etag changed. retrying...")
// this is the expected status code from the server when the if-match etag check fails
// continue with sync below
case errtypes.PreconditionFailed:
log.Debug().Msg("precondition failed when persisting added provider share: etag changed. retrying...")
// actually, this is the wrong status code and we treat it like errtypes.Aborted because of inconsistencies on the server side
// continue with sync below
default:
span.SetStatus(codes.Error, fmt.Sprintf("persisting added provider share failed. giving up: %s", err.Error()))
log.Error().Err(err).Msg("persisting added provider share failed")
return err
}
if err := c.syncWithLock(ctx, storageID, spaceID); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
log.Error().Err(err).Msg("persisting added provider share failed. giving up.")
return err
}
err = persistFunc()
}
return err
}
// Remove removes a share from the cache
func (c *Cache) Remove(ctx context.Context, storageID, spaceID, shareID string) error {
ctx, span := tracer.Start(ctx, "Remove")
defer span.End()
span.SetAttributes(attribute.String("cs3.storageid", storageID), attribute.String("cs3.spaceid", spaceID), attribute.String("cs3.shareid", shareID))
unlock := c.LockSpace(spaceID)
defer unlock()
span.AddEvent("got lock")
if c.Providers[storageID] == nil || c.Providers[storageID].Spaces[spaceID] == nil {
if !c.isSpaceCached(storageID, spaceID) {
err := c.syncWithLock(ctx, storageID, spaceID)
if err != nil {
return err
}
}
ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "Remove")
defer span.End()
span.SetAttributes(attribute.String("cs3.storageid", storageID), attribute.String("cs3.spaceid", spaceID), attribute.String("cs3.shareid", shareID))
persistFunc := func() error {
if c.Providers[storageID] == nil ||
c.Providers[storageID].Spaces[spaceID] == nil {
spaces, ok := c.Providers.Load(storageID)
if !ok {
return nil
}
delete(c.Providers[storageID].Spaces[spaceID].Shares, shareID)
space, _ := spaces.Spaces.Load(spaceID)
if !ok {
return nil
}
delete(space.Shares, shareID)
return c.Persist(ctx, storageID, spaceID)
}
err := persistFunc()
if _, ok := err.(errtypes.IsPreconditionFailed); ok {
if err := c.syncWithLock(ctx, storageID, spaceID); err != nil {
log := appctx.GetLogger(ctx).With().
Str("hostname", os.Getenv("HOSTNAME")).
Str("storageID", storageID).
Str("spaceID", spaceID).
Str("shareID", shareID).Logger()
var err error
for retries := 100; retries > 0; retries-- {
err = persistFunc()
switch err.(type) {
case nil:
span.SetStatus(codes.Ok, "")
return nil
case errtypes.Aborted:
log.Debug().Msg("aborted when persisting removed provider share: etag changed. retrying...")
// this is the expected status code from the server when the if-match etag check fails
// continue with sync below
case errtypes.PreconditionFailed:
log.Debug().Msg("precondition failed when persisting removed provider share: etag changed. retrying...")
// actually, this is the wrong status code and we treat it like errtypes.Aborted because of inconsistencies on the server side
// continue with sync below
default:
span.SetStatus(codes.Error, fmt.Sprintf("persisting removed provider share failed. giving up: %s", err.Error()))
log.Error().Err(err).Msg("persisting removed provider share failed")
return err
}
if err := c.syncWithLock(ctx, storageID, spaceID); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
log.Error().Err(err).Msg("persisting removed provider share failed. giving up.")
return err
}
err = persistFunc()
}
return err
}
// Get returns one entry from the cache
func (c *Cache) Get(ctx context.Context, storageID, spaceID, shareID string) (*collaboration.Share, error) {
// sync cache, maybe our data is outdated
err := c.Sync(ctx, storageID, spaceID)
if err != nil {
return nil, err
func (c *Cache) Get(ctx context.Context, storageID, spaceID, shareID string, skipSync bool) (*collaboration.Share, error) {
ctx, span := tracer.Start(ctx, "Get")
defer span.End()
span.SetAttributes(attribute.String("cs3.storageid", storageID), attribute.String("cs3.spaceid", spaceID), attribute.String("cs3.shareid", shareID))
unlock := c.LockSpace(spaceID)
defer unlock()
span.AddEvent("got lock")
if !skipSync {
// sync cache, maybe our data is outdated
err := c.syncWithLock(ctx, storageID, spaceID)
if err != nil {
return nil, err
}
}
if c.Providers[storageID] == nil ||
c.Providers[storageID].Spaces[spaceID] == nil {
spaces, ok := c.Providers.Load(storageID)
if !ok {
return nil, nil
}
return c.Providers[storageID].Spaces[spaceID].Shares[shareID], nil
space, ok := spaces.Spaces.Load(spaceID)
if !ok {
return nil, nil
}
return space.Shares[shareID], nil
}
// ListSpace returns the list of shares in a given space
func (c *Cache) ListSpace(ctx context.Context, storageID, spaceID string) (*Shares, error) {
ctx, span := tracer.Start(ctx, "ListSpace")
defer span.End()
span.SetAttributes(attribute.String("cs3.storageid", storageID), attribute.String("cs3.spaceid", spaceID))
unlock := c.LockSpace(spaceID)
defer unlock()
span.AddEvent("got lock")
// sync cache, maybe our data is outdated
err := c.Sync(ctx, storageID, spaceID)
err := c.syncWithLock(ctx, storageID, spaceID)
if err != nil {
return nil, err
}
if c.Providers[storageID] == nil || c.Providers[storageID].Spaces[spaceID] == nil {
spaces, ok := c.Providers.Load(storageID)
if !ok {
return &Shares{}, nil
}
return c.Providers[storageID].Spaces[spaceID], nil
space, ok := spaces.Spaces.Load(spaceID)
if !ok {
return &Shares{}, nil
}
shares := &Shares{
Shares: maps.Clone(space.Shares),
Etag: space.Etag,
}
return shares, nil
}
// Persist persists the data of one space
func (c *Cache) Persist(ctx context.Context, storageID, spaceID string) error {
ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "PersistWithTime")
ctx, span := tracer.Start(ctx, "Persist")
defer span.End()
span.SetAttributes(attribute.String("cs3.storageid", storageID), attribute.String("cs3.spaceid", spaceID))
if c.Providers[storageID] == nil || c.Providers[storageID].Spaces[spaceID] == nil {
span.SetStatus(codes.Ok, "no shares in provider or space")
spaces, ok := c.Providers.Load(storageID)
if !ok {
span.AddEvent("nothing to persist")
span.SetStatus(codes.Ok, "")
return nil
}
space, ok := spaces.Spaces.Load(spaceID)
if !ok {
span.AddEvent("nothing to persist")
span.SetStatus(codes.Ok, "")
return nil
}
span.SetAttributes(attribute.String("BeforeEtag", space.Etag))
log := appctx.GetLogger(ctx).With().Str("storageID", storageID).Str("spaceID", spaceID).Logger()
log = log.With().Str("BeforeEtag", space.Etag).Logger()
oldMtime := c.Providers[storageID].Spaces[spaceID].Mtime
c.Providers[storageID].Spaces[spaceID].Mtime = time.Now()
// FIXME there is a race when between this time now and the below Uploed another process also updates the file -> we need a lock
createdBytes, err := json.Marshal(c.Providers[storageID].Spaces[spaceID])
createdBytes, err := json.Marshal(space)
if err != nil {
c.Providers[storageID].Spaces[spaceID].Mtime = oldMtime
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return err
}
jsonPath := spaceJSONPath(storageID, spaceID)
if err := c.storage.MakeDirIfNotExist(ctx, path.Dir(jsonPath)); err != nil {
c.Providers[storageID].Spaces[spaceID].Mtime = oldMtime
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return err
}
if err = c.storage.Upload(ctx, metadata.UploadRequest{
Path: jsonPath,
Content: createdBytes,
IfUnmodifiedSince: oldMtime,
MTime: c.Providers[storageID].Spaces[spaceID].Mtime,
}); err != nil {
c.Providers[storageID].Spaces[spaceID].Mtime = oldMtime
span.SetAttributes(attribute.String("etag", space.Etag))
ur := metadata.UploadRequest{
Path: jsonPath,
Content: createdBytes,
IfMatchEtag: space.Etag,
}
// when there is no etag in memory make sure the file has not been created on the server, see https://www.rfc-editor.org/rfc/rfc9110#field.if-match
// > If the field value is "*", the condition is false if the origin server has a current representation for the target resource.
if space.Etag == "" {
ur.IfNoneMatch = []string{"*"}
log.Debug().Msg("setting IfNoneMatch to *")
} else {
log.Debug().Msg("setting IfMatchEtag")
}
res, err := c.storage.Upload(ctx, ur)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
log.Debug().Err(err).Msg("persisting provider cache failed")
return err
}
space.Etag = res.Etag
span.SetStatus(codes.Ok, "")
shares := []string{}
for _, s := range space.Shares {
shares = append(shares, s.GetId().GetOpaqueId())
}
log.Debug().Str("AfterEtag", space.Etag).Interface("Shares", shares).Msg("persisted provider cache")
return nil
}
// Sync updates the in-memory data with the data from the storage if it is outdated
func (c *Cache) Sync(ctx context.Context, storageID, spaceID string) error {
unlock := c.LockSpace(spaceID)
defer unlock()
return c.syncWithLock(ctx, storageID, spaceID)
}
func (c *Cache) syncWithLock(ctx context.Context, storageID, spaceID string) error {
ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "Sync")
ctx, span := tracer.Start(ctx, "syncWithLock")
defer span.End()
span.SetAttributes(attribute.String("cs3.storageid", storageID), attribute.String("cs3.spaceid", spaceID))
c.initializeIfNeeded(storageID, spaceID)
log := appctx.GetLogger(ctx).With().Str("storageID", storageID).Str("spaceID", spaceID).Logger()
spaces, _ := c.Providers.Load(storageID)
space, _ := spaces.Spaces.Load(spaceID)
span.SetAttributes(attribute.String("cs3.storageid", storageID), attribute.String("cs3.spaceid", spaceID), attribute.String("etag", space.Etag))
log := appctx.GetLogger(ctx).With().Str("storageID", storageID).Str("spaceID", spaceID).Str("etag", space.Etag).Str("hostname", os.Getenv("HOSTNAME")).Logger()
var mtime time.Time
if c.Providers[storageID] != nil && c.Providers[storageID].Spaces[spaceID] != nil {
mtime = c.Providers[storageID].Spaces[spaceID].Mtime
if time.Now().Before(c.Providers[storageID].Spaces[spaceID].nextSync) {
span.AddEvent("skip sync")
span.SetStatus(codes.Ok, "")
return nil
}
c.Providers[storageID].Spaces[spaceID].nextSync = time.Now().Add(c.ttl)
} else {
mtime = time.Time{} // Set zero time so that data from storage always takes precedence
dlreq := metadata.DownloadRequest{
Path: spaceJSONPath(storageID, spaceID),
}
// when we know an etag, only download if it changed remotely
if space.Etag != "" {
dlreq.IfNoneMatch = []string{space.Etag}
}
jsonPath := spaceJSONPath(storageID, spaceID)
info, err := c.storage.Stat(ctx, jsonPath)
if err != nil {
if _, ok := err.(errtypes.NotFound); ok {
span.AddEvent("no file")
span.SetStatus(codes.Ok, "")
return nil // Nothing to sync against
}
if _, ok := err.(*os.PathError); ok {
span.AddEvent("no dir")
span.SetStatus(codes.Ok, "")
return nil // Nothing to sync against
}
span.SetStatus(codes.Error, fmt.Sprintf("Failed to stat the provider cache: %s", err.Error()))
log.Error().Err(err).Msg("Failed to stat the provider cache")
dlres, err := c.storage.Download(ctx, dlreq)
switch err.(type) {
case nil:
span.AddEvent("updating local cache")
case errtypes.NotFound:
span.SetStatus(codes.Ok, "")
return nil
case errtypes.NotModified:
span.SetStatus(codes.Ok, "")
return nil
default:
span.RecordError(err)
span.SetStatus(codes.Error, "downloading provider cache failed")
return err
}
// check mtime of /users/{userid}/created.json
if utils.TSToTime(info.Mtime).After(mtime) {
span.AddEvent("updating cache")
// - update cached list of created shares for the user in memory if changed
createdBlob, err := c.storage.SimpleDownload(ctx, jsonPath)
if err != nil {
span.SetStatus(codes.Error, fmt.Sprintf("Failed to download the provider cache: %s", err.Error()))
log.Error().Err(err).Msg("Failed to download the provider cache")
return err
}
newShares := &Shares{}
err = json.Unmarshal(createdBlob, newShares)
if err != nil {
span.SetStatus(codes.Error, fmt.Sprintf("Failed to unmarshal the provider cache: %s", err.Error()))
log.Error().Err(err).Msg("Failed to unmarshal the provider cache")
return err
}
newShares.Mtime = utils.TSToTime(info.Mtime)
c.initializeIfNeeded(storageID, spaceID)
c.Providers[storageID].Spaces[spaceID] = newShares
span.AddEvent("updating local cache")
newShares := &Shares{}
err = json.Unmarshal(dlres.Content, newShares)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, "unmarshaling provider cache failed")
log.Error().Err(err).Msg("unmarshaling provider cache failed")
return err
}
newShares.Etag = dlres.Etag
spaces.Spaces.Store(spaceID, newShares)
span.SetStatus(codes.Ok, "")
return nil
}
func (c *Cache) initializeIfNeeded(storageID, spaceID string) {
if c.Providers[storageID] == nil {
c.Providers[storageID] = &Spaces{
Spaces: map[string]*Shares{},
}
}
if c.Providers[storageID].Spaces[spaceID] == nil {
c.Providers[storageID].Spaces[spaceID] = &Shares{
Shares: map[string]*collaboration.Share{},
}
}
spaces, _ := c.Providers.LoadOrStore(storageID, &Spaces{
Spaces: mtimesyncedcache.Map[string, *Shares]{},
})
_, _ = spaces.Spaces.LoadOrStore(spaceID, &Shares{
Shares: map[string]*collaboration.Share{},
})
}
func spaceJSONPath(storageID, spaceID string) string {

View File

@@ -22,6 +22,7 @@ import (
"context"
"encoding/json"
"fmt"
"os"
"path"
"path/filepath"
"sync"
@@ -32,7 +33,6 @@ import (
"github.com/cs3org/reva/v2/pkg/appctx"
"github.com/cs3org/reva/v2/pkg/errtypes"
"github.com/cs3org/reva/v2/pkg/storage/utils/metadata"
"github.com/cs3org/reva/v2/pkg/utils"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
)
@@ -54,15 +54,13 @@ type Cache struct {
// Spaces holds the received shares of one user per space
type Spaces struct {
Mtime time.Time
Spaces map[string]*Space
nextSync time.Time
etag string
}
// Space holds the received shares of one user in one space
type Space struct {
Mtime time.Time
States map[string]*State
}
@@ -92,7 +90,10 @@ func (c *Cache) lockUser(userID string) func() {
// Add adds a new entry to the cache
func (c *Cache) Add(ctx context.Context, userID, spaceID string, rs *collaboration.ReceivedShare) error {
ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "Grab lock")
unlock := c.lockUser(userID)
span.End()
span.SetAttributes(attribute.String("cs3.userid", userID))
defer unlock()
if c.ReceivedSpaces[userID] == nil {
@@ -102,22 +103,14 @@ func (c *Cache) Add(ctx context.Context, userID, spaceID string, rs *collaborati
}
}
ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "Add")
ctx, span = appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "Add")
defer span.End()
span.SetAttributes(attribute.String("cs3.userid", userID), attribute.String("cs3.spaceid", spaceID))
persistFunc := func() error {
if c.ReceivedSpaces[userID] == nil {
c.ReceivedSpaces[userID] = &Spaces{
Spaces: map[string]*Space{},
}
}
if c.ReceivedSpaces[userID].Spaces[spaceID] == nil {
c.ReceivedSpaces[userID].Spaces[spaceID] = &Space{}
}
c.initializeIfNeeded(userID, spaceID)
receivedSpace := c.ReceivedSpaces[userID].Spaces[spaceID]
receivedSpace.Mtime = time.Now()
if receivedSpace.States == nil {
receivedSpace.States = map[string]*State{}
}
@@ -128,22 +121,51 @@ func (c *Cache) Add(ctx context.Context, userID, spaceID string, rs *collaborati
return c.persist(ctx, userID)
}
err := persistFunc()
if _, ok := err.(errtypes.IsPreconditionFailed); ok {
log := appctx.GetLogger(ctx).With().
Str("hostname", os.Getenv("HOSTNAME")).
Str("userID", userID).
Str("spaceID", spaceID).Logger()
var err error
for retries := 100; retries > 0; retries-- {
err = persistFunc()
switch err.(type) {
case nil:
span.SetStatus(codes.Ok, "")
return nil
case errtypes.Aborted:
log.Debug().Msg("aborted when persisting added received share: etag changed. retrying...")
// this is the expected status code from the server when the if-match etag check fails
// continue with sync below
case errtypes.PreconditionFailed:
log.Debug().Msg("precondition failed when persisting added received share: etag changed. retrying...")
// actually, this is the wrong status code and we treat it like errtypes.Aborted because of inconsistencies on the server side
// continue with sync below
default:
span.SetStatus(codes.Error, fmt.Sprintf("persisting added received share failed. giving up: %s", err.Error()))
log.Error().Err(err).Msg("persisting added received share failed")
return err
}
if err := c.syncWithLock(ctx, userID); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
log.Error().Err(err).Msg("persisting added received share failed. giving up.")
return err
}
err = persistFunc()
}
return err
}
// Get returns one entry from the cache
func (c *Cache) Get(ctx context.Context, userID, spaceID, shareID string) (*State, error) {
err := c.Sync(ctx, userID)
ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "Grab lock")
unlock := c.lockUser(userID)
span.End()
span.SetAttributes(attribute.String("cs3.userid", userID))
defer unlock()
err := c.syncWithLock(ctx, userID)
if err != nil {
return nil, err
}
@@ -155,7 +177,10 @@ func (c *Cache) Get(ctx context.Context, userID, spaceID, shareID string) (*Stat
// Sync updates the in-memory data with the data from the storage if it is outdated
func (c *Cache) Sync(ctx context.Context, userID string) error {
ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "Grab lock")
unlock := c.lockUser(userID)
span.End()
span.SetAttributes(attribute.String("cs3.userid", userID))
defer unlock()
return c.syncWithLock(ctx, userID)
@@ -168,52 +193,40 @@ func (c *Cache) syncWithLock(ctx context.Context, userID string) error {
log := appctx.GetLogger(ctx).With().Str("userID", userID).Logger()
var mtime time.Time
if c.ReceivedSpaces[userID] != nil {
if time.Now().Before(c.ReceivedSpaces[userID].nextSync) {
span.AddEvent("skip sync")
span.SetStatus(codes.Ok, "")
return nil
}
c.ReceivedSpaces[userID].nextSync = time.Now().Add(c.ttl)
mtime = c.ReceivedSpaces[userID].Mtime
} else {
mtime = time.Time{} // Set zero time so that data from storage always takes precedence
}
c.initializeIfNeeded(userID, "")
jsonPath := userJSONPath(userID)
info, err := c.storage.Stat(ctx, jsonPath) // TODO we only need the mtime ... use fieldmask to make the request cheaper
if err != nil {
if _, ok := err.(errtypes.NotFound); ok {
span.AddEvent("no file")
span.SetStatus(codes.Ok, "")
return nil // Nothing to sync against
}
span.SetStatus(codes.Error, fmt.Sprintf("Failed to stat the received share: %s", err.Error()))
log.Error().Err(err).Msg("Failed to stat the received share")
span.AddEvent("updating cache")
// - update cached list of created shares for the user in memory if changed
dlres, err := c.storage.Download(ctx, metadata.DownloadRequest{
Path: jsonPath,
IfNoneMatch: []string{c.ReceivedSpaces[userID].etag},
})
switch err.(type) {
case nil:
span.AddEvent("updating local cache")
case errtypes.NotFound:
span.SetStatus(codes.Ok, "")
return nil
case errtypes.NotModified:
span.SetStatus(codes.Ok, "")
return nil
default:
span.SetStatus(codes.Error, fmt.Sprintf("Failed to download the received share: %s", err.Error()))
log.Error().Err(err).Msg("Failed to download the received share")
return err
}
// check mtime of /users/{userid}/created.json
if utils.TSToTime(info.Mtime).After(mtime) {
span.AddEvent("updating cache")
// - update cached list of created shares for the user in memory if changed
createdBlob, err := c.storage.SimpleDownload(ctx, jsonPath)
if err != nil {
span.SetStatus(codes.Error, fmt.Sprintf("Failed to download the received share: %s", err.Error()))
log.Error().Err(err).Msg("Failed to download the received share")
return err
}
newSpaces := &Spaces{}
err = json.Unmarshal(createdBlob, newSpaces)
if err != nil {
span.SetStatus(codes.Error, fmt.Sprintf("Failed to unmarshal the received share: %s", err.Error()))
log.Error().Err(err).Msg("Failed to unmarshal the received share")
return err
}
newSpaces.Mtime = utils.TSToTime(info.Mtime)
c.ReceivedSpaces[userID] = newSpaces
newSpaces := &Spaces{}
err = json.Unmarshal(dlres.Content, newSpaces)
if err != nil {
span.SetStatus(codes.Error, fmt.Sprintf("Failed to unmarshal the received share: %s", err.Error()))
log.Error().Err(err).Msg("Failed to unmarshal the received share")
return err
}
newSpaces.etag = dlres.Etag
c.ReceivedSpaces[userID] = newSpaces
span.SetStatus(codes.Ok, "")
return nil
}
@@ -229,31 +242,32 @@ func (c *Cache) persist(ctx context.Context, userID string) error {
return nil
}
oldMtime := c.ReceivedSpaces[userID].Mtime
c.ReceivedSpaces[userID].Mtime = time.Now()
createdBytes, err := json.Marshal(c.ReceivedSpaces[userID])
if err != nil {
c.ReceivedSpaces[userID].Mtime = oldMtime
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return err
}
jsonPath := userJSONPath(userID)
if err := c.storage.MakeDirIfNotExist(ctx, path.Dir(jsonPath)); err != nil {
c.ReceivedSpaces[userID].Mtime = oldMtime
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return err
}
if err = c.storage.Upload(ctx, metadata.UploadRequest{
Path: jsonPath,
Content: createdBytes,
IfUnmodifiedSince: oldMtime,
MTime: c.ReceivedSpaces[userID].Mtime,
}); err != nil {
c.ReceivedSpaces[userID].Mtime = oldMtime
ur := metadata.UploadRequest{
Path: jsonPath,
Content: createdBytes,
IfMatchEtag: c.ReceivedSpaces[userID].etag,
}
// when there is no etag in memory make sure the file has not been created on the server, see https://www.rfc-editor.org/rfc/rfc9110#field.if-match
// > If the field value is "*", the condition is false if the origin server has a current representation for the target resource.
if c.ReceivedSpaces[userID].etag == "" {
ur.IfNoneMatch = []string{"*"}
}
_, err = c.storage.Upload(ctx, ur)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return err
@@ -265,3 +279,14 @@ func (c *Cache) persist(ctx context.Context, userID string) error {
func userJSONPath(userID string) string {
return filepath.Join("/users", userID, "received.json")
}
func (c *Cache) initializeIfNeeded(userID, spaceID string) {
if c.ReceivedSpaces[userID] == nil {
c.ReceivedSpaces[userID] = &Spaces{
Spaces: map[string]*Space{},
}
}
if spaceID != "" && c.ReceivedSpaces[userID].Spaces[spaceID] == nil {
c.ReceivedSpaces[userID].Spaces[spaceID] = &Space{}
}
}

View File

@@ -22,6 +22,7 @@ import (
"context"
"encoding/json"
"fmt"
"os"
"path"
"path/filepath"
"sync"
@@ -31,7 +32,6 @@ import (
"github.com/cs3org/reva/v2/pkg/errtypes"
"github.com/cs3org/reva/v2/pkg/share/manager/jsoncs3/shareid"
"github.com/cs3org/reva/v2/pkg/storage/utils/metadata"
"github.com/cs3org/reva/v2/pkg/utils"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
)
@@ -55,16 +55,14 @@ type Cache struct {
// UserShareCache holds the space/share map for one user
type UserShareCache struct {
Mtime time.Time
UserShares map[string]*SpaceShareIDs
nextSync time.Time
Etag string
}
// SpaceShareIDs holds the unique list of share ids for a space
type SpaceShareIDs struct {
Mtime time.Time
IDs map[string]struct{}
IDs map[string]struct{}
}
func (c *Cache) lockUser(userID string) func() {
@@ -89,7 +87,10 @@ func New(s metadata.Storage, namespace, filename string, ttl time.Duration) Cach
// Add adds a share to the cache
func (c *Cache) Add(ctx context.Context, userid, shareID string) error {
ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "Grab lock")
unlock := c.lockUser(userid)
span.End()
span.SetAttributes(attribute.String("cs3.userid", userid))
defer unlock()
if c.UserShares[userid] == nil {
@@ -99,7 +100,7 @@ func (c *Cache) Add(ctx context.Context, userid, shareID string) error {
}
}
ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "Add")
ctx, span = appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "Add")
defer span.End()
span.SetAttributes(attribute.String("cs3.userid", userid), attribute.String("cs3.shareid", shareID))
@@ -107,41 +108,54 @@ func (c *Cache) Add(ctx context.Context, userid, shareID string) error {
ssid := storageid + shareid.IDDelimiter + spaceid
persistFunc := func() error {
now := time.Now()
if c.UserShares[userid] == nil {
c.UserShares[userid] = &UserShareCache{
UserShares: map[string]*SpaceShareIDs{},
}
}
if c.UserShares[userid].UserShares[ssid] == nil {
c.UserShares[userid].UserShares[ssid] = &SpaceShareIDs{
IDs: map[string]struct{}{},
}
}
c.initializeIfNeeded(userid, ssid)
// add share id
c.UserShares[userid].UserShares[ssid].Mtime = now
c.UserShares[userid].UserShares[ssid].IDs[shareID] = struct{}{}
return c.Persist(ctx, userid)
}
err := persistFunc()
if _, ok := err.(errtypes.IsPreconditionFailed); ok {
log := appctx.GetLogger(ctx).With().
Str("hostname", os.Getenv("HOSTNAME")).
Str("userID", userid).
Str("shareID", shareID).Logger()
var err error
for retries := 100; retries > 0; retries-- {
err = persistFunc()
switch err.(type) {
case nil:
span.SetStatus(codes.Ok, "")
return nil
case errtypes.Aborted:
log.Debug().Msg("aborted when persisting added share: etag changed. retrying...")
// this is the expected status code from the server when the if-match etag check fails
// continue with sync below
case errtypes.PreconditionFailed:
log.Debug().Msg("precondition failed when persisting added share: etag changed. retrying...")
// actually, this is the wrong status code and we treat it like errtypes.Aborted because of inconsistencies on the server side
// continue with sync below
default:
span.SetStatus(codes.Error, fmt.Sprintf("persisting added share failed. giving up: %s", err.Error()))
log.Error().Err(err).Msg("persisting added share failed")
return err
}
if err := c.syncWithLock(ctx, userid); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
log.Error().Err(err).Msg("persisting added share failed. giving up.")
return err
}
err = persistFunc()
// TODO try more often?
}
return err
}
// Remove removes a share for the given user
func (c *Cache) Remove(ctx context.Context, userid, shareID string) error {
ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "Grab lock")
unlock := c.lockUser(userid)
span.End()
span.SetAttributes(attribute.String("cs3.userid", userid))
defer unlock()
if c.UserShares[userid] == nil {
@@ -151,7 +165,7 @@ func (c *Cache) Remove(ctx context.Context, userid, shareID string) error {
}
}
ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "Remove")
ctx, span = appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "Remove")
defer span.End()
span.SetAttributes(attribute.String("cs3.userid", userid), attribute.String("cs3.shareid", shareID))
@@ -167,19 +181,42 @@ func (c *Cache) Remove(ctx context.Context, userid, shareID string) error {
if c.UserShares[userid].UserShares[ssid] != nil {
// remove share id
c.UserShares[userid].UserShares[ssid].Mtime = time.Now()
delete(c.UserShares[userid].UserShares[ssid].IDs, shareID)
}
return c.Persist(ctx, userid)
}
err := persistFunc()
if _, ok := err.(errtypes.IsPreconditionFailed); ok {
if err := c.syncWithLock(ctx, userid); err != nil {
log := appctx.GetLogger(ctx).With().
Str("hostname", os.Getenv("HOSTNAME")).
Str("userID", userid).
Str("shareID", shareID).Logger()
var err error
for retries := 100; retries > 0; retries-- {
err = persistFunc()
switch err.(type) {
case nil:
span.SetStatus(codes.Ok, "")
return nil
case errtypes.Aborted:
log.Debug().Msg("aborted when persisting removed share: etag changed. retrying...")
// this is the expected status code from the server when the if-match etag check fails
// continue with sync below
case errtypes.PreconditionFailed:
log.Debug().Msg("precondition failed when persisting removed share: etag changed. retrying...")
// actually, this is the wrong status code and we treat it like errtypes.Aborted because of inconsistencies on the server side
// continue with sync below
default:
span.SetStatus(codes.Error, fmt.Sprintf("persisting removed share failed. giving up: %s", err.Error()))
log.Error().Err(err).Msg("persisting removed share failed")
return err
}
if err := c.syncWithLock(ctx, userid); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return err
}
err = persistFunc()
}
return err
@@ -187,7 +224,10 @@ func (c *Cache) Remove(ctx context.Context, userid, shareID string) error {
// List return the list of spaces/shares for the given user/group
func (c *Cache) List(ctx context.Context, userid string) (map[string]SpaceShareIDs, error) {
ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "Grab lock")
unlock := c.lockUser(userid)
span.End()
span.SetAttributes(attribute.String("cs3.userid", userid))
defer unlock()
if err := c.syncWithLock(ctx, userid); err != nil {
return nil, err
@@ -200,8 +240,7 @@ func (c *Cache) List(ctx context.Context, userid string) (map[string]SpaceShareI
for ssid, cached := range c.UserShares[userid].UserShares {
r[ssid] = SpaceShareIDs{
Mtime: cached.Mtime,
IDs: cached.IDs,
IDs: cached.IDs,
}
}
return r, nil
@@ -214,54 +253,44 @@ func (c *Cache) syncWithLock(ctx context.Context, userID string) error {
log := appctx.GetLogger(ctx).With().Str("userID", userID).Logger()
var mtime time.Time
// - do we have a cached list of created shares for the user in memory?
if usc := c.UserShares[userID]; usc != nil {
if time.Now().Before(c.UserShares[userID].nextSync) {
span.AddEvent("skip sync")
span.SetStatus(codes.Ok, "")
return nil
}
c.UserShares[userID].nextSync = time.Now().Add(c.ttl)
mtime = usc.Mtime
// - y: set If-Modified-Since header to only download if it changed
} else {
mtime = time.Time{} // Set zero time so that data from storage always takes precedence
}
c.initializeIfNeeded(userID, "")
userCreatedPath := c.userCreatedPath(userID)
info, err := c.storage.Stat(ctx, userCreatedPath)
if err != nil {
if _, ok := err.(errtypes.NotFound); ok {
span.AddEvent("no file")
span.SetStatus(codes.Ok, "")
return nil // Nothing to sync against
}
span.SetStatus(codes.Error, fmt.Sprintf("Failed to stat the share cache: %s", err.Error()))
log.Error().Err(err).Msg("Failed to stat the share cache")
span.AddEvent("updating cache")
// - update cached list of created shares for the user in memory if changed
dlreq := metadata.DownloadRequest{
Path: userCreatedPath,
}
if c.UserShares[userID].Etag != "" {
dlreq.IfNoneMatch = []string{c.UserShares[userID].Etag}
}
dlres, err := c.storage.Download(ctx, dlreq)
switch err.(type) {
case nil:
span.AddEvent("updating local cache")
case errtypes.NotFound:
span.SetStatus(codes.Ok, "")
return nil
case errtypes.NotModified:
span.SetStatus(codes.Ok, "")
return nil
default:
span.SetStatus(codes.Error, fmt.Sprintf("Failed to download the share cache: %s", err.Error()))
log.Error().Err(err).Msg("Failed to download the share cache")
return err
}
// check mtime of /users/{userid}/created.json
if utils.TSToTime(info.Mtime).After(mtime) {
span.AddEvent("updating cache")
// - update cached list of created shares for the user in memory if changed
createdBlob, err := c.storage.SimpleDownload(ctx, userCreatedPath)
if err != nil {
span.SetStatus(codes.Error, fmt.Sprintf("Failed to download the share cache: %s", err.Error()))
log.Error().Err(err).Msg("Failed to download the share cache")
return err
}
newShareCache := &UserShareCache{}
err = json.Unmarshal(createdBlob, newShareCache)
if err != nil {
span.SetStatus(codes.Error, fmt.Sprintf("Failed to unmarshal the share cache: %s", err.Error()))
log.Error().Err(err).Msg("Failed to unmarshal the share cache")
return err
}
newShareCache.Mtime = utils.TSToTime(info.Mtime)
c.UserShares[userID] = newShareCache
newShareCache := &UserShareCache{}
err = json.Unmarshal(dlres.Content, newShareCache)
if err != nil {
span.SetStatus(codes.Error, fmt.Sprintf("Failed to unmarshal the share cache: %s", err.Error()))
log.Error().Err(err).Msg("Failed to unmarshal the share cache")
return err
}
newShareCache.Etag = dlres.Etag
c.UserShares[userID] = newShareCache
span.SetStatus(codes.Ok, "")
return nil
}
@@ -272,35 +301,36 @@ func (c *Cache) Persist(ctx context.Context, userid string) error {
defer span.End()
span.SetAttributes(attribute.String("cs3.userid", userid))
oldMtime := c.UserShares[userid].Mtime
c.UserShares[userid].Mtime = time.Now()
createdBytes, err := json.Marshal(c.UserShares[userid])
if err != nil {
c.UserShares[userid].Mtime = oldMtime
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return err
}
jsonPath := c.userCreatedPath(userid)
if err := c.storage.MakeDirIfNotExist(ctx, path.Dir(jsonPath)); err != nil {
c.UserShares[userid].Mtime = oldMtime
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return err
}
if err = c.storage.Upload(ctx, metadata.UploadRequest{
Path: jsonPath,
Content: createdBytes,
IfUnmodifiedSince: oldMtime,
MTime: c.UserShares[userid].Mtime,
}); err != nil {
c.UserShares[userid].Mtime = oldMtime
ur := metadata.UploadRequest{
Path: jsonPath,
Content: createdBytes,
IfMatchEtag: c.UserShares[userid].Etag,
}
// when there is no etag in memory make sure the file has not been created on the server, see https://www.rfc-editor.org/rfc/rfc9110#field.if-match
// > If the field value is "*", the condition is false if the origin server has a current representation for the target resource.
if c.UserShares[userid].Etag == "" {
ur.IfNoneMatch = []string{"*"}
}
res, err := c.storage.Upload(ctx, ur)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return err
}
c.UserShares[userid].Etag = res.Etag
span.SetStatus(codes.Ok, "")
return nil
}
@@ -308,3 +338,16 @@ func (c *Cache) Persist(ctx context.Context, userid string) error {
func (c *Cache) userCreatedPath(userid string) string {
return filepath.Join("/", c.namespace, userid, c.filename)
}
func (c *Cache) initializeIfNeeded(userid, ssid string) {
if c.UserShares[userid] == nil {
c.UserShares[userid] = &UserShareCache{
UserShares: map[string]*SpaceShareIDs{},
}
}
if ssid != "" && c.UserShares[userid].UserShares[ssid] == nil {
c.UserShares[userid].UserShares[ssid] = &SpaceShareIDs{
IDs: map[string]struct{}{},
}
}
}

View File

@@ -42,6 +42,7 @@ import (
"github.com/cs3org/reva/v2/pkg/events"
"github.com/cs3org/reva/v2/pkg/logger"
"github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool"
"github.com/cs3org/reva/v2/pkg/rhttp/datatx/utils/download"
"github.com/cs3org/reva/v2/pkg/storage"
"github.com/cs3org/reva/v2/pkg/storage/cache"
"github.com/cs3org/reva/v2/pkg/storage/utils/chunking"
@@ -992,17 +993,17 @@ func (fs *Decomposedfs) Download(ctx context.Context, ref *provider.Reference) (
return fs.DownloadRevision(ctx, ref, ref.ResourceId.OpaqueId)
}
node, err := fs.lu.NodeFromResource(ctx, ref)
n, err := fs.lu.NodeFromResource(ctx, ref)
if err != nil {
return nil, errors.Wrap(err, "Decomposedfs: error resolving ref")
}
if !node.Exists {
err = errtypes.NotFound(filepath.Join(node.ParentID, node.Name))
if !n.Exists {
err = errtypes.NotFound(filepath.Join(n.ParentID, n.Name))
return nil, err
}
rp, err := fs.p.AssemblePermissions(ctx, node)
rp, err := fs.p.AssemblePermissions(ctx, n)
switch {
case err != nil:
return nil, err
@@ -1014,9 +1015,21 @@ func (fs *Decomposedfs) Download(ctx context.Context, ref *provider.Reference) (
return nil, errtypes.NotFound(f)
}
reader, err := fs.tp.ReadBlob(node)
mtime, err := n.GetMTime(ctx)
if err != nil {
return nil, errors.Wrap(err, "Decomposedfs: error download blob '"+node.ID+"'")
return nil, errors.Wrap(err, "Decomposedfs: error getting mtime for '"+n.ID+"'")
}
currentEtag, err := node.CalculateEtag(n, mtime)
if err != nil {
return nil, errors.Wrap(err, "Decomposedfs: error calculating etag for '"+n.ID+"'")
}
expectedEtag := download.EtagFromContext(ctx)
if currentEtag != expectedEtag {
return nil, errtypes.Aborted(fmt.Sprintf("file changed from etag %s to %s", expectedEtag, currentEtag))
}
reader, err := fs.tp.ReadBlob(n)
if err != nil {
return nil, errors.Wrap(err, "Decomposedfs: error download blob '"+n.ID+"'")
}
return reader, nil
}

View File

@@ -20,6 +20,7 @@ package decomposedfs
import (
"context"
"os"
"path/filepath"
"strings"
@@ -32,6 +33,7 @@ import (
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node"
"github.com/cs3org/reva/v2/pkg/storagespace"
"github.com/cs3org/reva/v2/pkg/utils"
"github.com/rogpeppe/go-internal/lockedfile"
)
// DenyGrant denies access to a resource.
@@ -74,15 +76,14 @@ func (fs *Decomposedfs) DenyGrant(ctx context.Context, ref *provider.Reference,
func (fs *Decomposedfs) AddGrant(ctx context.Context, ref *provider.Reference, g *provider.Grant) (err error) {
log := appctx.GetLogger(ctx)
log.Debug().Interface("ref", ref).Interface("grant", g).Msg("AddGrant()")
grantNode, grant, err := fs.loadGrant(ctx, ref, g)
grantNode, unlockFunc, grant, err := fs.loadGrant(ctx, ref, g)
if err != nil {
return err
}
defer unlockFunc()
if grant != nil {
// grant exists -> go to UpdateGrant
// TODO: should we hard error in this case?
return fs.UpdateGrant(ctx, ref, g)
return errtypes.AlreadyExists(filepath.Join(grantNode.ParentID, grantNode.Name))
}
owner := grantNode.Owner()
@@ -171,10 +172,11 @@ func (fs *Decomposedfs) ListGrants(ctx context.Context, ref *provider.Reference)
// RemoveGrant removes a grant from resource
func (fs *Decomposedfs) RemoveGrant(ctx context.Context, ref *provider.Reference, g *provider.Grant) (err error) {
grantNode, grant, err := fs.loadGrant(ctx, ref, g)
grantNode, unlockFunc, grant, err := fs.loadGrant(ctx, ref, g)
if err != nil {
return err
}
defer unlockFunc()
if grant == nil {
return errtypes.NotFound("grant not found")
@@ -200,14 +202,7 @@ func (fs *Decomposedfs) RemoveGrant(ctx context.Context, ref *provider.Reference
return err
}
var attr string
if g.Grantee.Type == provider.GranteeType_GRANTEE_TYPE_GROUP {
attr = prefixes.GrantGroupAcePrefix + g.Grantee.GetGroupId().OpaqueId
} else {
attr = prefixes.GrantUserAcePrefix + g.Grantee.GetUserId().OpaqueId
}
if err = grantNode.RemoveXattr(ctx, attr); err != nil {
if err := grantNode.DeleteGrant(ctx, g, false); err != nil {
return err
}
@@ -243,10 +238,11 @@ func (fs *Decomposedfs) UpdateGrant(ctx context.Context, ref *provider.Reference
log := appctx.GetLogger(ctx)
log.Debug().Interface("ref", ref).Interface("grant", g).Msg("UpdateGrant()")
grantNode, grant, err := fs.loadGrant(ctx, ref, g)
grantNode, unlockFunc, grant, err := fs.loadGrant(ctx, ref, g)
if err != nil {
return err
}
defer unlockFunc()
if grant == nil {
// grant not found
@@ -273,34 +269,42 @@ func (fs *Decomposedfs) UpdateGrant(ctx context.Context, ref *provider.Reference
}
// checks if the given grant exists and returns it. Nil grant means it doesn't exist
func (fs *Decomposedfs) loadGrant(ctx context.Context, ref *provider.Reference, g *provider.Grant) (*node.Node, *provider.Grant, error) {
func (fs *Decomposedfs) loadGrant(ctx context.Context, ref *provider.Reference, g *provider.Grant) (*node.Node, func(), *provider.Grant, error) {
var unlockFunc func()
n, err := fs.lu.NodeFromResource(ctx, ref)
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}
if !n.Exists {
return nil, nil, errtypes.NotFound(filepath.Join(n.ParentID, n.Name))
return nil, nil, nil, errtypes.NotFound(filepath.Join(n.ParentID, n.Name))
}
f, err := lockedfile.OpenFile(fs.lu.MetadataBackend().LockfilePath(n.InternalPath()), os.O_RDWR|os.O_CREATE, 0600)
if err != nil {
return nil, nil, nil, err
}
unlockFunc = func() { f.Close() }
grants, err := n.ListGrants(ctx)
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}
for _, grant := range grants {
switch grant.Grantee.GetType() {
case provider.GranteeType_GRANTEE_TYPE_USER:
if g.Grantee.GetUserId().GetOpaqueId() == grant.Grantee.GetUserId().GetOpaqueId() {
return n, grant, nil
return n, unlockFunc, grant, nil
}
case provider.GranteeType_GRANTEE_TYPE_GROUP:
if g.Grantee.GetGroupId().GetOpaqueId() == grant.Grantee.GetGroupId().GetOpaqueId() {
return n, grant, nil
return n, unlockFunc, grant, nil
}
}
}
return n, nil, nil
return n, unlockFunc, nil, nil
}
func (fs *Decomposedfs) storeGrant(ctx context.Context, n *node.Node, g *provider.Grant) error {
@@ -323,7 +327,10 @@ func (fs *Decomposedfs) storeGrant(ctx context.Context, n *node.Node, g *provide
// set the grant
e := ace.FromGrant(g)
principal, value := e.Marshal()
if err := n.SetXattr(ctx, prefixes.GrantPrefix+principal, value); err != nil {
attribs := node.Attributes{
prefixes.GrantPrefix + principal: value,
}
if err := n.SetXattrsWithContext(ctx, attribs, false); err != nil {
appctx.GetLogger(ctx).Error().Err(err).
Str("principal", principal).Msg("Could not set grant for principal")
return err

View File

@@ -295,7 +295,7 @@ func refFromCS3(b []byte) (*provider.Reference, error) {
// The optional filter function can be used to filter by attribute name, e.g. by checking a prefix
// For the source file, a shared lock is acquired.
// NOTE: target resource will be write locked!
func (lu *Lookup) CopyMetadata(ctx context.Context, src, target string, filter func(attributeName string) bool) (err error) {
func (lu *Lookup) CopyMetadata(ctx context.Context, src, target string, filter func(attributeName string, value []byte) (newValue []byte, copy bool), acquireTargetLock bool) (err error) {
// Acquire a read log on the source node
// write lock existing node before reading treesize or tree time
lock, err := lockedfile.OpenFile(lu.MetadataBackend().LockfilePath(src), os.O_RDONLY|os.O_CREATE, 0600)
@@ -315,14 +315,14 @@ func (lu *Lookup) CopyMetadata(ctx context.Context, src, target string, filter f
}
}()
return lu.CopyMetadataWithSourceLock(ctx, src, target, filter, lock)
return lu.CopyMetadataWithSourceLock(ctx, src, target, filter, lock, acquireTargetLock)
}
// CopyMetadataWithSourceLock copies all extended attributes from source to target.
// The optional filter function can be used to filter by attribute name, e.g. by checking a prefix
// For the source file, a matching lockedfile is required.
// NOTE: target resource will be write locked!
func (lu *Lookup) CopyMetadataWithSourceLock(ctx context.Context, sourcePath, targetPath string, filter func(attributeName string) bool, lockedSource *lockedfile.File) (err error) {
func (lu *Lookup) CopyMetadataWithSourceLock(ctx context.Context, sourcePath, targetPath string, filter func(attributeName string, value []byte) (newValue []byte, copy bool), lockedSource *lockedfile.File, acquireTargetLock bool) (err error) {
switch {
case lockedSource == nil:
return errors.New("no lock provided")
@@ -337,12 +337,16 @@ func (lu *Lookup) CopyMetadataWithSourceLock(ctx context.Context, sourcePath, ta
newAttrs := make(map[string][]byte, 0)
for attrName, val := range attrs {
if filter == nil || filter(attrName) {
newAttrs[attrName] = val
if filter != nil {
var ok bool
if val, ok = filter(attrName, val); !ok {
continue
}
}
newAttrs[attrName] = val
}
return lu.MetadataBackend().SetMultiple(ctx, targetPath, newAttrs, true)
return lu.MetadataBackend().SetMultiple(ctx, targetPath, newAttrs, acquireTargetLock)
}
// DetectBackendOnDisk returns the name of the metadata backend being used on disk

View File

@@ -74,7 +74,7 @@ func (fs *Decomposedfs) SetArbitraryMetadata(ctx context.Context, ref *provider.
if md.Metadata != nil {
if val, ok := md.Metadata["mtime"]; ok {
delete(md.Metadata, "mtime")
if err := n.SetMtimeString(val); err != nil {
if err := n.SetMtimeString(ctx, val); err != nil {
errs = append(errs, errors.Wrap(err, "could not set mtime"))
}
}
@@ -184,7 +184,7 @@ func (fs *Decomposedfs) UnsetArbitraryMetadata(ctx context.Context, ref *provide
continue
}
fa := fmt.Sprintf("%s:%s:%s@%s", prefixes.FavPrefix, utils.UserTypeToString(uid.GetType()), uid.GetOpaqueId(), uid.GetIdp())
if err := n.RemoveXattr(ctx, fa); err != nil {
if err := n.RemoveXattr(ctx, fa, true); err != nil {
if metadata.IsAttrUnset(err) {
continue // already gone, ignore
}
@@ -195,7 +195,7 @@ func (fs *Decomposedfs) UnsetArbitraryMetadata(ctx context.Context, ref *provide
errs = append(errs, errors.Wrap(err, "could not unset favorite flag"))
}
default:
if err = n.RemoveXattr(ctx, prefixes.MetadataPrefix+k); err != nil {
if err = n.RemoveXattr(ctx, prefixes.MetadataPrefix+k, true); err != nil {
if metadata.IsAttrUnset(err) {
continue // already gone, ignore
}

View File

@@ -120,8 +120,8 @@ func (b MessagePackBackend) SetMultiple(ctx context.Context, path string, attrib
}
// Remove an extended attribute key
func (b MessagePackBackend) Remove(ctx context.Context, path, key string) error {
return b.saveAttributes(ctx, path, nil, []string{key}, true)
func (b MessagePackBackend) Remove(ctx context.Context, path, key string, acquireLock bool) error {
return b.saveAttributes(ctx, path, nil, []string{key}, acquireLock)
}
// AllWithLockedSource reads all extended attributes from the given reader (if possible).

View File

@@ -46,7 +46,7 @@ type Backend interface {
List(ctx context.Context, path string) (attribs []string, err error)
Set(ctx context.Context, path, key string, val []byte) error
SetMultiple(ctx context.Context, path string, attribs map[string][]byte, acquireLock bool) error
Remove(ctx context.Context, path, key string) error
Remove(ctx context.Context, path, key string, acquireLock bool) error
Purge(path string) error
Rename(oldPath, newPath string) error
@@ -95,7 +95,7 @@ func (NullBackend) SetMultiple(ctx context.Context, path string, attribs map[str
}
// Remove removes an extended attribute key
func (NullBackend) Remove(ctx context.Context, path string, key string) error {
func (NullBackend) Remove(ctx context.Context, path string, key string, acquireLock bool) error {
return errUnconfiguredError
}

View File

@@ -66,6 +66,8 @@ const (
// that node.
PropagationAttr string = OcisPrefix + "propagation"
// we need mtime to keep mtime in sync with the metadata
MTimeAttr string = OcisPrefix + "mtime"
// the tree modification time of the tree below this node,
// propagated when synctime_accounting is true and
// user.ocis.propagation=1 is set

View File

@@ -146,12 +146,14 @@ func (XattrsBackend) SetMultiple(ctx context.Context, path string, attribs map[s
}
// Remove an extended attribute key
func (XattrsBackend) Remove(ctx context.Context, filePath string, key string) (err error) {
lockedFile, err := lockedfile.OpenFile(filePath+filelocks.LockFileSuffix, os.O_CREATE|os.O_WRONLY, 0600)
if err != nil {
return err
func (XattrsBackend) Remove(ctx context.Context, filePath string, key string, acquireLock bool) (err error) {
if acquireLock {
lockedFile, err := lockedfile.OpenFile(filePath+filelocks.LockFileSuffix, os.O_CREATE|os.O_WRONLY, 0600)
if err != nil {
return err
}
defer cleanupLockfile(lockedFile)
}
defer cleanupLockfile(lockedFile)
return xattr.Remove(filePath, key)
}

View File

@@ -97,7 +97,7 @@ func (m Migration0003) Migrate(migrator *Migrator) (Result, error) {
}
for k := range attribs {
err = xattrs.Remove(context.Background(), path, k)
err = xattrs.Remove(context.Background(), path, k, false)
if err != nil {
migrator.log.Debug().Err(err).Str("path", path).Msg("error removing xattr")
}

View File

@@ -511,16 +511,21 @@ func (n *Node) LockFilePath() string {
}
// CalculateEtag returns a hash of fileid + tmtime (or mtime)
func CalculateEtag(nodeID string, tmTime time.Time) (string, error) {
return calculateEtag(nodeID, tmTime)
func CalculateEtag(n *Node, tmTime time.Time) (string, error) {
return calculateEtag(n, tmTime)
}
// calculateEtag returns a hash of fileid + tmtime (or mtime)
func calculateEtag(nodeID string, tmTime time.Time) (string, error) {
func calculateEtag(n *Node, tmTime time.Time) (string, error) {
h := md5.New()
if _, err := io.WriteString(h, nodeID); err != nil {
if _, err := io.WriteString(h, n.ID); err != nil {
return "", err
}
/* TODO we could strengthen the etag by adding the blobid, but then all etags would change. we would need a legacy etag check as well
if _, err := io.WriteString(h, n.BlobID); err != nil {
return "", err
}
*/
if tb, err := tmTime.UTC().MarshalBinary(); err == nil {
if _, err := h.Write(tb); err != nil {
return "", err
@@ -532,19 +537,20 @@ func calculateEtag(nodeID string, tmTime time.Time) (string, error) {
}
// SetMtimeString sets the mtime and atime of a node to the unixtime parsed from the given string
func (n *Node) SetMtimeString(mtime string) error {
mt, err := parseMTime(mtime)
func (n *Node) SetMtimeString(ctx context.Context, mtime string) error {
mt, err := utils.MTimeToTime(mtime)
if err != nil {
return err
}
return n.SetMtime(mt)
return n.SetMtime(ctx, &mt)
}
// SetMtime sets the mtime and atime of a node
func (n *Node) SetMtime(mtime time.Time) error {
nodePath := n.InternalPath()
// updating mtime also updates atime
return os.Chtimes(nodePath, mtime, mtime)
// SetMTime writes the UTC mtime to the extended attributes or removes the attribute if nil is passed
func (n *Node) SetMtime(ctx context.Context, t *time.Time) (err error) {
if t == nil {
return n.RemoveXattr(ctx, prefixes.MTimeAttr, true)
}
return n.SetXattrString(ctx, prefixes.MTimeAttr, t.UTC().Format(time.RFC3339Nano))
}
// SetEtag sets the temporary etag of a node if it differs from the current etag
@@ -555,7 +561,7 @@ func (n *Node) SetEtag(ctx context.Context, val string) (err error) {
return
}
var etag string
if etag, err = calculateEtag(n.ID, tmTime); err != nil {
if etag, err = calculateEtag(n, tmTime); err != nil {
return
}
@@ -666,7 +672,7 @@ func (n *Node) AsResourceInfo(ctx context.Context, rp *provider.ResourcePermissi
// use temporary etag if it is set
if b, err := n.XattrString(ctx, prefixes.TmpEtagAttr); err == nil && b != "" {
ri.Etag = fmt.Sprintf(`"%x"`, b)
} else if ri.Etag, err = calculateEtag(n.ID, tmTime); err != nil {
} else if ri.Etag, err = calculateEtag(n, tmTime); err != nil {
sublog.Debug().Err(err).Msg("could not calculate etag")
}
@@ -883,22 +889,26 @@ func (n *Node) GetTMTime(ctx context.Context) (time.Time, error) {
}
// no tmtime, use mtime
return n.GetMTime()
return n.GetMTime(ctx)
}
// GetMTime reads the mtime from disk
func (n *Node) GetMTime() (time.Time, error) {
fi, err := os.Lstat(n.InternalPath())
// GetMTime reads the mtime from the extended attributes, falling back to disk
func (n *Node) GetMTime(ctx context.Context) (time.Time, error) {
b, err := n.XattrString(ctx, prefixes.MTimeAttr)
if err != nil {
return time.Time{}, err
fi, err := os.Lstat(n.InternalPath())
if err != nil {
return time.Time{}, err
}
return fi.ModTime(), nil
}
return fi.ModTime(), nil
return time.Parse(time.RFC3339Nano, b)
}
// SetTMTime writes the UTC tmtime to the extended attributes or removes the attribute if nil is passed
func (n *Node) SetTMTime(ctx context.Context, t *time.Time) (err error) {
if t == nil {
return n.RemoveXattr(ctx, prefixes.TreeMTimeAttr)
return n.RemoveXattr(ctx, prefixes.TreeMTimeAttr, true)
}
return n.SetXattrString(ctx, prefixes.TreeMTimeAttr, t.UTC().Format(time.RFC3339Nano))
}
@@ -915,7 +925,7 @@ func (n *Node) GetDTime(ctx context.Context) (tmTime time.Time, err error) {
// SetDTime writes the UTC dtime to the extended attributes or removes the attribute if nil is passed
func (n *Node) SetDTime(ctx context.Context, t *time.Time) (err error) {
if t == nil {
return n.RemoveXattr(ctx, prefixes.DTimeAttr)
return n.RemoveXattr(ctx, prefixes.DTimeAttr, true)
}
return n.SetXattrString(ctx, prefixes.DTimeAttr, t.UTC().Format(time.RFC3339Nano))
}
@@ -962,7 +972,14 @@ func (n *Node) SetChecksum(ctx context.Context, csType string, h hash.Hash) (err
// UnsetTempEtag removes the temporary etag attribute
func (n *Node) UnsetTempEtag(ctx context.Context) (err error) {
return n.RemoveXattr(ctx, prefixes.TmpEtagAttr)
return n.RemoveXattr(ctx, prefixes.TmpEtagAttr, true)
}
func isGrantExpired(g *provider.Grant) bool {
if g.Expiration == nil {
return false
}
return time.Now().After(time.Unix(int64(g.Expiration.Seconds), int64(g.Expiration.Nanos)))
}
// ReadUserPermissions will assemble the permissions for the current user on the given node without parent nodes
@@ -1017,6 +1034,10 @@ func (n *Node) ReadUserPermissions(ctx context.Context, u *userpb.User) (ap prov
continue
}
if isGrantExpired(g) {
continue
}
switch {
case err == nil:
// If all permissions are set to false we have a deny grant
@@ -1111,6 +1132,23 @@ func (n *Node) ReadGrant(ctx context.Context, grantee string) (g *provider.Grant
return e.Grant(), nil
}
// ReadGrant reads a CS3 grant
func (n *Node) DeleteGrant(ctx context.Context, g *provider.Grant, acquireLock bool) (err error) {
var attr string
if g.Grantee.Type == provider.GranteeType_GRANTEE_TYPE_GROUP {
attr = prefixes.GrantGroupAcePrefix + g.Grantee.GetGroupId().OpaqueId
} else {
attr = prefixes.GrantUserAcePrefix + g.Grantee.GetUserId().OpaqueId
}
if err = n.RemoveXattr(ctx, attr, acquireLock); err != nil {
return err
}
return nil
}
// ListGrants lists all grants of the current node.
func (n *Node) ListGrants(ctx context.Context) ([]*provider.Grant, error) {
grantees, err := n.ListGrantees(ctx)
@@ -1159,17 +1197,6 @@ func (n *Node) getGranteeTypes(ctx context.Context) []provider.GranteeType {
return types
}
func parseMTime(v string) (t time.Time, err error) {
p := strings.SplitN(v, ".", 2)
var sec, nsec int64
if sec, err = strconv.ParseInt(p[0], 10, 64); err == nil {
if len(p) > 1 {
nsec, err = strconv.ParseInt(p[1], 10, 64)
}
}
return time.Unix(sec, nsec), err
}
// FindStorageSpaceRoot calls n.Parent() and climbs the tree
// until it finds the space root node and adds it to the node
func (n *Node) FindStorageSpaceRoot(ctx context.Context) error {
@@ -1198,7 +1225,7 @@ func (n *Node) UnmarkProcessing(ctx context.Context, uploadID string) error {
// file started another postprocessing later - do not remove
return nil
}
return n.RemoveXattr(ctx, prefixes.StatusPrefix)
return n.RemoveXattr(ctx, prefixes.StatusPrefix, true)
}
// IsProcessing returns true if the node is currently being processed

View File

@@ -94,11 +94,11 @@ func (n *Node) SetXattrString(ctx context.Context, key, val string) (err error)
}
// RemoveXattr removes an extended attribute from the write-through cache/node
func (n *Node) RemoveXattr(ctx context.Context, key string) error {
func (n *Node) RemoveXattr(ctx context.Context, key string, acquireLock bool) error {
if n.xattrsCache != nil {
delete(n.xattrsCache, key)
}
return n.lu.MetadataBackend().Remove(ctx, n.InternalPath(), key)
return n.lu.MetadataBackend().Remove(ctx, n.InternalPath(), key, acquireLock)
}
// XattrsWithReader returns the extended attributes of the node. If the attributes have already

View File

@@ -33,6 +33,7 @@ import (
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node"
"github.com/cs3org/reva/v2/pkg/storagespace"
"github.com/pkg/errors"
"github.com/rogpeppe/go-internal/lockedfile"
)
// Revision entries are stored inside the node folder and start with the same uuid as the current version.
@@ -90,7 +91,7 @@ func (fs *Decomposedfs) ListRevisions(ctx context.Context, ref *provider.Referen
appctx.GetLogger(ctx).Error().Err(err).Str("name", fi.Name()).Msg("error reading blobsize xattr, using 0")
}
rev.Size = uint64(blobSize)
etag, err := node.CalculateEtag(np, mtime)
etag, err := node.CalculateEtag(n, mtime)
if err != nil {
return nil, errors.Wrapf(err, "error calculating etag")
}
@@ -207,83 +208,98 @@ func (fs *Decomposedfs) RestoreRevision(ctx context.Context, ref *provider.Refer
return err
}
// write lock node before copying metadata
f, err := lockedfile.OpenFile(fs.lu.MetadataBackend().LockfilePath(n.InternalPath()), os.O_RDWR|os.O_CREATE, 0600)
if err != nil {
return err
}
defer f.Close()
// move current version to new revision
nodePath := fs.lu.InternalPath(spaceID, kp[0])
var fi os.FileInfo
if fi, err = os.Stat(nodePath); err == nil {
// revisions are stored alongside the actual file, so a rename can be efficient and does not cross storage / partition boundaries
newRevisionPath := fs.lu.InternalPath(spaceID, kp[0]+node.RevisionIDDelimiter+fi.ModTime().UTC().Format(time.RFC3339Nano))
// touch new revision
if _, err := os.Create(newRevisionPath); err != nil {
return err
}
defer func() {
if returnErr != nil {
if err := os.Remove(newRevisionPath); err != nil {
log.Error().Err(err).Str("revision", filepath.Base(newRevisionPath)).Msg("could not clean up revision node")
}
if err := fs.lu.MetadataBackend().Purge(newRevisionPath); err != nil {
log.Error().Err(err).Str("revision", filepath.Base(newRevisionPath)).Msg("could not clean up revision node")
}
}
}()
// copy blob metadata from node to new revision node
err = fs.lu.CopyMetadata(ctx, nodePath, newRevisionPath, func(attributeName string) bool {
return strings.HasPrefix(attributeName, prefixes.ChecksumPrefix) || // for checksums
attributeName == prefixes.TypeAttr ||
attributeName == prefixes.BlobIDAttr ||
attributeName == prefixes.BlobsizeAttr
})
if err != nil {
return errtypes.InternalError("failed to copy blob xattrs to version node: " + err.Error())
}
// remember mtime from node as new revision mtime
if err = os.Chtimes(newRevisionPath, fi.ModTime(), fi.ModTime()); err != nil {
return errtypes.InternalError("failed to change mtime of version node")
}
// update blob id in node
// copy blob metadata from restored revision to node
restoredRevisionPath := fs.lu.InternalPath(spaceID, revisionKey)
err = fs.lu.CopyMetadata(ctx, restoredRevisionPath, nodePath, func(attributeName string) bool {
return strings.HasPrefix(attributeName, prefixes.ChecksumPrefix) ||
attributeName == prefixes.TypeAttr ||
attributeName == prefixes.BlobIDAttr ||
attributeName == prefixes.BlobsizeAttr
})
if err != nil {
return errtypes.InternalError("failed to copy blob xattrs to old revision to node: " + err.Error())
}
revisionSize, err := fs.lu.MetadataBackend().GetInt64(ctx, restoredRevisionPath, prefixes.BlobsizeAttr)
if err != nil {
return errtypes.InternalError("failed to read blob size xattr from old revision")
}
// drop old revision
if err := os.Remove(restoredRevisionPath); err != nil {
log.Warn().Err(err).Interface("ref", ref).Str("originalnode", kp[0]).Str("revisionKey", revisionKey).Msg("could not delete old revision, continuing")
}
// explicitly update mtime of node as writing xattrs does not change mtime
now := time.Now()
if err := os.Chtimes(nodePath, now, now); err != nil {
return errtypes.InternalError("failed to change mtime of version node")
}
// revision 5, current 10 (restore a smaller blob) -> 5-10 = -5
// revision 10, current 5 (restore a bigger blob) -> 10-5 = +5
sizeDiff := revisionSize - n.Blobsize
return fs.tp.Propagate(ctx, n, sizeDiff)
mtime, err := n.GetMTime(ctx)
if err != nil {
log.Error().Err(err).Interface("ref", ref).Str("originalnode", kp[0]).Str("revisionKey", revisionKey).Msg("cannot read mtime")
return err
}
log.Error().Err(err).Interface("ref", ref).Str("originalnode", kp[0]).Str("revisionKey", revisionKey).Msg("original node does not exist")
return nil
// revisions are stored alongside the actual file, so a rename can be efficient and does not cross storage / partition boundaries
newRevisionPath := fs.lu.InternalPath(spaceID, kp[0]+node.RevisionIDDelimiter+mtime.UTC().Format(time.RFC3339Nano))
// touch new revision
if _, err := os.Create(newRevisionPath); err != nil {
return err
}
defer func() {
if returnErr != nil {
if err := os.Remove(newRevisionPath); err != nil {
log.Error().Err(err).Str("revision", filepath.Base(newRevisionPath)).Msg("could not clean up revision node")
}
if err := fs.lu.MetadataBackend().Purge(newRevisionPath); err != nil {
log.Error().Err(err).Str("revision", filepath.Base(newRevisionPath)).Msg("could not clean up revision node")
}
}
}()
// copy blob metadata from node to new revision node
err = fs.lu.CopyMetadataWithSourceLock(ctx, nodePath, newRevisionPath, func(attributeName string, value []byte) (newValue []byte, copy bool) {
return value, strings.HasPrefix(attributeName, prefixes.ChecksumPrefix) || // for checksums
attributeName == prefixes.TypeAttr ||
attributeName == prefixes.BlobIDAttr ||
attributeName == prefixes.BlobsizeAttr ||
attributeName == prefixes.MTimeAttr // FIXME somewhere I mix up the revision time and the mtime, causing the restore to overwrite the other existing revisien
}, f, true)
if err != nil {
return errtypes.InternalError("failed to copy blob xattrs to version node: " + err.Error())
}
// remember mtime from node as new revision mtime
if err = os.Chtimes(newRevisionPath, mtime, mtime); err != nil {
return errtypes.InternalError("failed to change mtime of version node")
}
// update blob id in node
// copy blob metadata from restored revision to node
restoredRevisionPath := fs.lu.InternalPath(spaceID, revisionKey)
err = fs.lu.CopyMetadata(ctx, restoredRevisionPath, nodePath, func(attributeName string, value []byte) (newValue []byte, copy bool) {
if attributeName == prefixes.MTimeAttr {
// update mtime
return []byte(time.Now().UTC().Format(time.RFC3339Nano)), true
}
return value, strings.HasPrefix(attributeName, prefixes.ChecksumPrefix) ||
attributeName == prefixes.TypeAttr ||
attributeName == prefixes.BlobIDAttr ||
attributeName == prefixes.BlobsizeAttr
}, false)
if err != nil {
return errtypes.InternalError("failed to copy blob xattrs to old revision to node: " + err.Error())
}
revisionSize, err := fs.lu.MetadataBackend().GetInt64(ctx, restoredRevisionPath, prefixes.BlobsizeAttr)
if err != nil {
return errtypes.InternalError("failed to read blob size xattr from old revision")
}
// drop old revision
if err := os.Remove(restoredRevisionPath); err != nil {
log.Warn().Err(err).Interface("ref", ref).Str("originalnode", kp[0]).Str("revisionKey", revisionKey).Msg("could not delete old revision, continuing")
}
if err := os.Remove(fs.lu.MetadataBackend().MetadataPath(restoredRevisionPath)); err != nil {
log.Warn().Err(err).Interface("ref", ref).Str("originalnode", kp[0]).Str("revisionKey", revisionKey).Msg("could not delete old revision metadata, continuing")
}
if err := os.Remove(fs.lu.MetadataBackend().LockfilePath(restoredRevisionPath)); err != nil {
log.Warn().Err(err).Interface("ref", ref).Str("originalnode", kp[0]).Str("revisionKey", revisionKey).Msg("could not delete old revision metadata lockfile, continuing")
}
if err := fs.lu.MetadataBackend().Purge(restoredRevisionPath); err != nil {
log.Warn().Err(err).Interface("ref", ref).Str("originalnode", kp[0]).Str("revisionKey", revisionKey).Msg("could not purge old revision from cache, continuing")
}
// revision 5, current 10 (restore a smaller blob) -> 5-10 = -5
// revision 10, current 5 (restore a bigger blob) -> 10-5 = +5
sizeDiff := revisionSize - n.Blobsize
return fs.tp.Propagate(ctx, n, sizeDiff)
}
// DeleteRevision deletes the specified revision of the resource

View File

@@ -806,12 +806,14 @@ func (fs *Decomposedfs) storageSpaceFromNode(ctx context.Context, n *node.Node,
}
}
sublog := appctx.GetLogger(ctx).With().Str("space", n.SpaceRoot.ID).Logger()
var err error
// TODO apply more filters
var sname string
if sname, err = n.SpaceRoot.XattrString(ctx, prefixes.SpaceNameAttr); err != nil {
// FIXME: Is that a severe problem?
appctx.GetLogger(ctx).Debug().Err(err).Msg("space does not have a name attribute")
sublog.Debug().Err(err).Msg("space does not have a name attribute")
}
/*
@@ -846,15 +848,28 @@ func (fs *Decomposedfs) storageSpaceFromNode(ctx context.Context, n *node.Node,
// This way we don't have to have a cron job checking the grants in regular intervals.
// The tradeof obviously is that this code is here.
if isGrantExpired(g) {
err := fs.RemoveGrant(ctx, &provider.Reference{
ResourceId: &provider.ResourceId{
SpaceId: n.SpaceRoot.SpaceID,
OpaqueId: n.ID},
}, g)
appctx.GetLogger(ctx).Error().Err(err).
Str("space", n.SpaceRoot.ID).
Str("grantee", id).
Msg("failed to remove expired space grant")
if err := n.DeleteGrant(ctx, g, true); err != nil {
sublog.Error().Err(err).Str("grantee", id).
Msg("failed to delete expired space grant")
}
if n.IsSpaceRoot(ctx) {
// invalidate space grant
switch {
case g.Grantee.Type == provider.GranteeType_GRANTEE_TYPE_USER:
// remove from user index
if err := fs.userSpaceIndex.Remove(g.Grantee.GetUserId().GetOpaqueId(), n.SpaceID); err != nil {
sublog.Error().Err(err).Str("grantee", id).
Msg("failed to delete expired user space index")
}
case g.Grantee.Type == provider.GranteeType_GRANTEE_TYPE_GROUP:
// remove from group index
if err := fs.groupSpaceIndex.Remove(g.Grantee.GetGroupId().GetOpaqueId(), n.SpaceID); err != nil {
sublog.Error().Err(err).Str("grantee", id).
Msg("failed to delete expired group space index")
}
}
}
continue
}
grantExpiration[id] = g.Expiration
@@ -949,7 +964,7 @@ func (fs *Decomposedfs) storageSpaceFromNode(ctx context.Context, n *node.Node,
}
}
etag, err := node.CalculateEtag(n.ID, tmtime)
etag, err := node.CalculateEtag(n, tmtime)
if err != nil {
return nil, err
}

View File

@@ -149,7 +149,7 @@ func (t *Tree) TouchFile(ctx context.Context, n *node.Node, markprocessing bool,
attributes[prefixes.StatusPrefix] = []byte(node.ProcessingStatus)
}
if mtime != "" {
if err := n.SetMtimeString(mtime); err != nil {
if err := n.SetMtimeString(ctx, mtime); err != nil {
return errors.Wrap(err, "Decomposedfs: could not set mtime")
}
}
@@ -469,7 +469,7 @@ func (t *Tree) Delete(ctx context.Context, n *node.Node) (err error) {
trashLink := filepath.Join(t.options.Root, "spaces", lookup.Pathify(n.SpaceRoot.ID, 1, 2), "trash", lookup.Pathify(n.ID, 4, 2))
if err := os.MkdirAll(filepath.Dir(trashLink), 0700); err != nil {
// Roll back changes
_ = n.RemoveXattr(ctx, prefixes.TrashOriginAttr)
_ = n.RemoveXattr(ctx, prefixes.TrashOriginAttr, true)
return err
}
@@ -482,7 +482,7 @@ func (t *Tree) Delete(ctx context.Context, n *node.Node) (err error) {
err = os.Symlink("../../../../../nodes/"+lookup.Pathify(n.ID, 4, 2)+node.TrashIDDelimiter+deletionTime, trashLink)
if err != nil {
// Roll back changes
_ = n.RemoveXattr(ctx, prefixes.TrashOriginAttr)
_ = n.RemoveXattr(ctx, prefixes.TrashOriginAttr, true)
return
}
@@ -495,12 +495,12 @@ func (t *Tree) Delete(ctx context.Context, n *node.Node) (err error) {
// To roll back changes
// TODO remove symlink
// Roll back changes
_ = n.RemoveXattr(ctx, prefixes.TrashOriginAttr)
_ = n.RemoveXattr(ctx, prefixes.TrashOriginAttr, true)
return
}
err = t.lookup.MetadataBackend().Rename(nodePath, trashPath)
if err != nil {
_ = n.RemoveXattr(ctx, prefixes.TrashOriginAttr)
_ = n.RemoveXattr(ctx, prefixes.TrashOriginAttr, true)
_ = os.Rename(trashPath, nodePath)
return
}
@@ -514,7 +514,7 @@ func (t *Tree) Delete(ctx context.Context, n *node.Node) (err error) {
// TODO revert the rename
// TODO remove symlink
// Roll back changes
_ = n.RemoveXattr(ctx, prefixes.TrashOriginAttr)
_ = n.RemoveXattr(ctx, prefixes.TrashOriginAttr, true)
return
}

View File

@@ -189,8 +189,16 @@ func (fs *Decomposedfs) InitiateUpload(ctx context.Context, ref *provider.Refere
return nil, errtypes.BadRequest("unsupported checksum algorithm: " + parts[0])
}
}
if ifMatch, ok := metadata["if-match"]; ok {
info.MetaData["if-match"] = ifMatch
// only check preconditions if they are not empty // TODO or is this a bad request?
if metadata["if-match"] != "" {
info.MetaData["if-match"] = metadata["if-match"]
}
if metadata["if-none-match"] != "" {
info.MetaData["if-none-match"] = metadata["if-none-match"]
}
if metadata["if-unmodified-since"] != "" {
info.MetaData["if-unmodified-since"] = metadata["if-unmodified-since"]
}
}

View File

@@ -168,6 +168,9 @@ func New(ctx context.Context, info tusd.FileInfo, lu *lookup.Lookup, tp Tree, p
info.Storage["NodeId"] = uuid.New().String()
info.Storage["NodeExists"] = "false"
}
if info.MetaData["if-none-match"] == "*" && info.Storage["NodeExists"] == "true" {
return nil, errtypes.Aborted(fmt.Sprintf("parent %s already has a child %s", n.ID, n.Name))
}
// Create binary file in the upload folder with no content
log.Debug().Interface("info", info).Msg("Decomposedfs: built storage info")
file, err := os.OpenFile(binPath, os.O_CREATE|os.O_WRONLY, defaultFilePerm)
@@ -276,10 +279,19 @@ func CreateNodeForUpload(upload *Upload, initAttrs node.Attributes) (*node.Node,
switch upload.Info.Storage["NodeExists"] {
case "false":
f, err = initNewNode(upload, n, uint64(fsize))
if f != nil {
appctx.GetLogger(upload.Ctx).Info().Str("lockfile", f.Name()).Interface("err", err).Msg("got lock file from initNewNode")
}
default:
f, err = updateExistingNode(upload, n, spaceID, uint64(fsize))
if f != nil {
appctx.GetLogger(upload.Ctx).Info().Str("lockfile", f.Name()).Interface("err", err).Msg("got lock file from updateExistingNode")
}
}
defer func() {
if f == nil {
return
}
if err := f.Close(); err != nil {
appctx.GetLogger(upload.Ctx).Error().Err(err).Str("nodeid", n.ID).Str("parentid", n.ParentID).Msg("could not close lock")
}
@@ -288,7 +300,17 @@ func CreateNodeForUpload(upload *Upload, initAttrs node.Attributes) (*node.Node,
return nil, err
}
mtime := time.Now()
if upload.Info.MetaData["mtime"] != "" {
// overwrite mtime if requested
mtime, err = utils.MTimeToTime(upload.Info.MetaData["mtime"])
if err != nil {
return nil, err
}
}
// overwrite technical information
initAttrs.SetString(prefixes.MTimeAttr, mtime.UTC().Format(time.RFC3339Nano))
initAttrs.SetInt64(prefixes.TypeAttr, int64(provider.ResourceType_RESOURCE_TYPE_FILE))
initAttrs.SetString(prefixes.ParentidAttr, n.ParentID)
initAttrs.SetString(prefixes.NameAttr, n.Name)
@@ -302,19 +324,8 @@ func CreateNodeForUpload(upload *Upload, initAttrs node.Attributes) (*node.Node,
return nil, errors.Wrap(err, "Decomposedfs: could not write metadata")
}
// overwrite mtime if requested
if upload.Info.MetaData["mtime"] != "" {
if err := n.SetMtimeString(upload.Info.MetaData["mtime"]); err != nil {
return nil, err
}
}
// add etag to metadata
tmtime, err := n.GetMTime()
if err != nil {
return nil, err
}
upload.Info.MetaData["etag"], _ = node.CalculateEtag(n.ID, tmtime)
upload.Info.MetaData["etag"], _ = node.CalculateEtag(n, mtime)
// update nodeid for later
upload.Info.Storage["NodeId"] = n.ID
@@ -338,7 +349,7 @@ func initNewNode(upload *Upload, n *node.Node, fsize uint64) (*lockedfile.File,
}
// we also need to touch the actual node file here it stores the mtime of the resource
h, err := os.OpenFile(n.InternalPath(), os.O_CREATE, 0600)
h, err := os.OpenFile(n.InternalPath(), os.O_CREATE|os.O_EXCL, 0600)
if err != nil {
return f, err
}
@@ -351,12 +362,18 @@ func initNewNode(upload *Upload, n *node.Node, fsize uint64) (*lockedfile.File,
// link child name to parent if it is new
childNameLink := filepath.Join(n.ParentPath(), n.Name)
relativeNodePath := filepath.Join("../../../../../", lookup.Pathify(n.ID, 4, 2))
log := appctx.GetLogger(upload.Ctx).With().Str("childNameLink", childNameLink).Str("relativeNodePath", relativeNodePath).Logger()
log.Info().Msg("initNewNode: creating symlink")
if err = os.Symlink(relativeNodePath, childNameLink); err != nil {
log.Info().Err(err).Msg("initNewNode: symlink failed")
if errors.Is(err, iofs.ErrExist) {
return nil, errtypes.AlreadyExists(n.Name)
log.Info().Err(err).Msg("initNewNode: symlink already exists")
return f, errtypes.AlreadyExists(n.Name)
}
return f, errors.Wrap(err, "Decomposedfs: could not symlink child entry")
}
log.Info().Msg("initNewNode: symlink created")
// on a new file the sizeDiff is the fileSize
upload.SizeDiff = int64(fsize)
@@ -365,67 +382,91 @@ func initNewNode(upload *Upload, n *node.Node, fsize uint64) (*lockedfile.File,
}
func updateExistingNode(upload *Upload, n *node.Node, spaceID string, fsize uint64) (*lockedfile.File, error) {
old, _ := node.ReadNode(upload.Ctx, upload.lu, spaceID, n.ID, false, nil, false)
if _, err := node.CheckQuota(upload.Ctx, n.SpaceRoot, true, uint64(old.Blobsize), fsize); err != nil {
targetPath := n.InternalPath()
// write lock existing node before reading any metadata
f, err := lockedfile.OpenFile(upload.lu.MetadataBackend().LockfilePath(targetPath), os.O_RDWR|os.O_CREATE, 0600)
if err != nil {
return nil, err
}
tmtime, err := old.GetTMTime(upload.Ctx)
old, _ := node.ReadNode(upload.Ctx, upload.lu, spaceID, n.ID, false, nil, false)
if _, err := node.CheckQuota(upload.Ctx, n.SpaceRoot, true, uint64(old.Blobsize), fsize); err != nil {
return f, err
}
oldNodeMtime, err := old.GetMTime(upload.Ctx)
if err != nil {
return nil, err
return f, err
}
oldNodeEtag, err := node.CalculateEtag(old, oldNodeMtime)
if err != nil {
return f, err
}
// When the if-match header was set we need to check if the
// etag still matches before finishing the upload.
if ifMatch, ok := upload.Info.MetaData["if-match"]; ok {
targetEtag, err := node.CalculateEtag(n.ID, tmtime)
switch {
case err != nil:
return nil, errtypes.InternalError(err.Error())
case ifMatch != targetEtag:
return nil, errtypes.Aborted("etag mismatch")
if ifMatch != oldNodeEtag {
return f, errtypes.Aborted("etag mismatch")
}
}
upload.versionsPath = upload.lu.InternalPath(spaceID, n.ID+node.RevisionIDDelimiter+tmtime.UTC().Format(time.RFC3339Nano))
// When the if-none-match header was set we need to check if any of the
// etags matches before finishing the upload.
if ifNoneMatch, ok := upload.Info.MetaData["if-none-match"]; ok {
if ifNoneMatch == "*" {
return f, errtypes.Aborted("etag mismatch, resource exists")
}
for _, ifNoneMatchTag := range strings.Split(ifNoneMatch, ",") {
if ifNoneMatchTag == oldNodeEtag {
return f, errtypes.Aborted("etag mismatch")
}
}
}
// When the if-unmodified-since header was set we need to check if the
// etag still matches before finishing the upload.
if ifUnmodifiedSince, ok := upload.Info.MetaData["if-unmodified-since"]; ok {
if err != nil {
return f, errtypes.InternalError(fmt.Sprintf("failed to read mtime of node: %s", err))
}
ifUnmodifiedSince, err := time.Parse(time.RFC3339Nano, ifUnmodifiedSince)
if err != nil {
return f, errtypes.InternalError(fmt.Sprintf("failed to parse if-unmodified-since time: %s", err))
}
if oldNodeMtime.After(ifUnmodifiedSince) {
return f, errtypes.Aborted("if-unmodified-since mismatch")
}
}
upload.versionsPath = upload.lu.InternalPath(spaceID, n.ID+node.RevisionIDDelimiter+oldNodeMtime.UTC().Format(time.RFC3339Nano))
upload.SizeDiff = int64(fsize) - old.Blobsize
upload.Info.MetaData["versionsPath"] = upload.versionsPath
upload.Info.MetaData["sizeDiff"] = strconv.Itoa(int(upload.SizeDiff))
targetPath := n.InternalPath()
// write lock existing node before reading treesize or tree time
f, err := lockedfile.OpenFile(upload.lu.MetadataBackend().LockfilePath(targetPath), os.O_RDWR|os.O_CREATE, 0600)
if err != nil {
return nil, err
}
// create version node
if _, err := os.Create(upload.versionsPath); err != nil {
return f, err
}
// copy blob metadata to version node
if err := upload.lu.CopyMetadataWithSourceLock(upload.Ctx, targetPath, upload.versionsPath, func(attributeName string) bool {
return strings.HasPrefix(attributeName, prefixes.ChecksumPrefix) ||
if err := upload.lu.CopyMetadataWithSourceLock(upload.Ctx, targetPath, upload.versionsPath, func(attributeName string, value []byte) (newValue []byte, copy bool) {
return value, strings.HasPrefix(attributeName, prefixes.ChecksumPrefix) ||
attributeName == prefixes.TypeAttr ||
attributeName == prefixes.BlobIDAttr ||
attributeName == prefixes.BlobsizeAttr
}, f); err != nil {
attributeName == prefixes.BlobsizeAttr ||
attributeName == prefixes.MTimeAttr
}, f, true); err != nil {
return f, err
}
// keep mtime from previous version
if err := os.Chtimes(upload.versionsPath, tmtime, tmtime); err != nil {
if err := os.Chtimes(upload.versionsPath, oldNodeMtime, oldNodeMtime); err != nil {
return f, errtypes.InternalError(fmt.Sprintf("failed to change mtime of version node: %s", err))
}
// update mtime of current version
mtime := time.Now()
if err := os.Chtimes(n.InternalPath(), mtime, mtime); err != nil {
return nil, err
}
return f, nil
}

Some files were not shown because too many files have changed in this diff Show More