bump reva

Signed-off-by: Jörn Friedrich Dreyer <jfd@butonic.de>
This commit is contained in:
Jörn Friedrich Dreyer
2024-05-30 11:27:56 +02:00
parent 933b1eb76c
commit e4b826d1ae
302 changed files with 63004 additions and 434 deletions
@@ -101,20 +101,20 @@ func (c *config) init() {
}
}
type service struct {
type Service struct {
conf *config
storage storage.FS
Storage storage.FS
dataServerURL *url.URL
availableXS []*provider.ResourceChecksumPriority
}
func (s *service) Close() error {
return s.storage.Shutdown(context.Background())
func (s *Service) Close() error {
return s.Storage.Shutdown(context.Background())
}
func (s *service) UnprotectedEndpoints() []string { return []string{} }
func (s *Service) UnprotectedEndpoints() []string { return []string{} }
func (s *service) Register(ss *grpc.Server) {
func (s *Service) Register(ss *grpc.Server) {
provider.RegisterProviderAPIServer(ss, s)
}
@@ -199,9 +199,9 @@ func New(m map[string]interface{}, ss *grpc.Server) (rgrpc.Service, error) {
return nil, err
}
service := &service{
service := &Service{
conf: c,
storage: fs,
Storage: fs,
dataServerURL: u,
availableXS: xsTypes,
}
@@ -209,20 +209,20 @@ func New(m map[string]interface{}, ss *grpc.Server) (rgrpc.Service, error) {
return service, nil
}
func (s *service) SetArbitraryMetadata(ctx context.Context, req *provider.SetArbitraryMetadataRequest) (*provider.SetArbitraryMetadataResponse, error) {
func (s *Service) SetArbitraryMetadata(ctx context.Context, req *provider.SetArbitraryMetadataRequest) (*provider.SetArbitraryMetadataResponse, error) {
ctx = ctxpkg.ContextSetLockID(ctx, req.LockId)
err := s.storage.SetArbitraryMetadata(ctx, req.Ref, req.ArbitraryMetadata)
err := s.Storage.SetArbitraryMetadata(ctx, req.Ref, req.ArbitraryMetadata)
return &provider.SetArbitraryMetadataResponse{
Status: status.NewStatusFromErrType(ctx, "set arbitrary metadata", err),
}, nil
}
func (s *service) UnsetArbitraryMetadata(ctx context.Context, req *provider.UnsetArbitraryMetadataRequest) (*provider.UnsetArbitraryMetadataResponse, error) {
func (s *Service) UnsetArbitraryMetadata(ctx context.Context, req *provider.UnsetArbitraryMetadataRequest) (*provider.UnsetArbitraryMetadataResponse, error) {
ctx = ctxpkg.ContextSetLockID(ctx, req.LockId)
err := s.storage.UnsetArbitraryMetadata(ctx, req.Ref, req.ArbitraryMetadataKeys)
err := s.Storage.UnsetArbitraryMetadata(ctx, req.Ref, req.ArbitraryMetadataKeys)
return &provider.UnsetArbitraryMetadataResponse{
Status: status.NewStatusFromErrType(ctx, "unset arbitrary metadata", err),
@@ -230,13 +230,13 @@ func (s *service) UnsetArbitraryMetadata(ctx context.Context, req *provider.Unse
}
// SetLock puts a lock on the given reference
func (s *service) SetLock(ctx context.Context, req *provider.SetLockRequest) (*provider.SetLockResponse, error) {
func (s *Service) SetLock(ctx context.Context, req *provider.SetLockRequest) (*provider.SetLockResponse, error) {
if !canLockPublicShare(ctx) {
return &provider.SetLockResponse{
Status: status.NewPermissionDenied(ctx, nil, "no permission to lock the share"),
}, nil
}
err := s.storage.SetLock(ctx, req.Ref, req.Lock)
err := s.Storage.SetLock(ctx, req.Ref, req.Lock)
return &provider.SetLockResponse{
Status: status.NewStatusFromErrType(ctx, "set lock", err),
@@ -244,8 +244,8 @@ func (s *service) SetLock(ctx context.Context, req *provider.SetLockRequest) (*p
}
// GetLock returns an existing lock on the given reference
func (s *service) GetLock(ctx context.Context, req *provider.GetLockRequest) (*provider.GetLockResponse, error) {
lock, err := s.storage.GetLock(ctx, req.Ref)
func (s *Service) GetLock(ctx context.Context, req *provider.GetLockRequest) (*provider.GetLockResponse, error) {
lock, err := s.Storage.GetLock(ctx, req.Ref)
return &provider.GetLockResponse{
Status: status.NewStatusFromErrType(ctx, "get lock", err),
@@ -254,14 +254,14 @@ func (s *service) GetLock(ctx context.Context, req *provider.GetLockRequest) (*p
}
// RefreshLock refreshes an existing lock on the given reference
func (s *service) RefreshLock(ctx context.Context, req *provider.RefreshLockRequest) (*provider.RefreshLockResponse, error) {
func (s *Service) RefreshLock(ctx context.Context, req *provider.RefreshLockRequest) (*provider.RefreshLockResponse, error) {
if !canLockPublicShare(ctx) {
return &provider.RefreshLockResponse{
Status: status.NewPermissionDenied(ctx, nil, "no permission to refresh the share lock"),
}, nil
}
err := s.storage.RefreshLock(ctx, req.Ref, req.Lock, req.ExistingLockId)
err := s.Storage.RefreshLock(ctx, req.Ref, req.Lock, req.ExistingLockId)
return &provider.RefreshLockResponse{
Status: status.NewStatusFromErrType(ctx, "refresh lock", err),
@@ -269,21 +269,21 @@ func (s *service) RefreshLock(ctx context.Context, req *provider.RefreshLockRequ
}
// Unlock removes an existing lock from the given reference
func (s *service) Unlock(ctx context.Context, req *provider.UnlockRequest) (*provider.UnlockResponse, error) {
func (s *Service) Unlock(ctx context.Context, req *provider.UnlockRequest) (*provider.UnlockResponse, error) {
if !canLockPublicShare(ctx) {
return &provider.UnlockResponse{
Status: status.NewPermissionDenied(ctx, nil, "no permission to unlock the share"),
}, nil
}
err := s.storage.Unlock(ctx, req.Ref, req.Lock)
err := s.Storage.Unlock(ctx, req.Ref, req.Lock)
return &provider.UnlockResponse{
Status: status.NewStatusFromErrType(ctx, "unlock", err),
}, nil
}
func (s *service) InitiateFileDownload(ctx context.Context, req *provider.InitiateFileDownloadRequest) (*provider.InitiateFileDownloadResponse, error) {
func (s *Service) InitiateFileDownload(ctx context.Context, req *provider.InitiateFileDownloadRequest) (*provider.InitiateFileDownloadResponse, error) {
// TODO(labkode): maybe add some checks before download starts? eg. check permissions?
// TODO(labkode): maybe add short-lived token?
// We now simply point the client to the data server.
@@ -329,7 +329,7 @@ func validateIfUnmodifiedSince(ifUnmodifiedSince *typesv1beta1.Timestamp, info *
}
}
func (s *service) InitiateFileUpload(ctx context.Context, req *provider.InitiateFileUploadRequest) (*provider.InitiateFileUploadResponse, error) {
func (s *Service) InitiateFileUpload(ctx context.Context, req *provider.InitiateFileUploadRequest) (*provider.InitiateFileUploadResponse, error) {
// TODO(labkode): same considerations as download
log := appctx.GetLogger(ctx)
if req.Ref.GetPath() == "/" {
@@ -412,7 +412,7 @@ func (s *service) InitiateFileUpload(ctx context.Context, req *provider.Initiate
metadata["expires"] = strconv.Itoa(int(expirationTimestamp.Seconds))
}
uploadIDs, err := s.storage.InitiateUpload(ctx, req.Ref, uploadLength, metadata)
uploadIDs, err := s.Storage.InitiateUpload(ctx, req.Ref, uploadLength, metadata)
if err != nil {
var st *rpc.Status
switch err.(type) {
@@ -477,9 +477,9 @@ func (s *service) InitiateFileUpload(ctx context.Context, req *provider.Initiate
return res, nil
}
func (s *service) GetPath(ctx context.Context, req *provider.GetPathRequest) (*provider.GetPathResponse, error) {
func (s *Service) GetPath(ctx context.Context, req *provider.GetPathRequest) (*provider.GetPathResponse, error) {
// TODO(labkode): check that the storage ID is the same as the storage provider id.
fn, err := s.storage.GetPathByID(ctx, req.ResourceId)
fn, err := s.Storage.GetPathByID(ctx, req.ResourceId)
if err != nil {
return &provider.GetPathResponse{
Status: status.NewStatusFromErrType(ctx, "get path", err),
@@ -492,17 +492,17 @@ func (s *service) GetPath(ctx context.Context, req *provider.GetPathRequest) (*p
return res, nil
}
func (s *service) GetHome(ctx context.Context, req *provider.GetHomeRequest) (*provider.GetHomeResponse, error) {
func (s *Service) GetHome(ctx context.Context, req *provider.GetHomeRequest) (*provider.GetHomeResponse, error) {
return nil, errtypes.NotSupported("unused, use the gateway to look up the user home")
}
func (s *service) CreateHome(ctx context.Context, req *provider.CreateHomeRequest) (*provider.CreateHomeResponse, error) {
func (s *Service) CreateHome(ctx context.Context, req *provider.CreateHomeRequest) (*provider.CreateHomeResponse, error) {
return nil, errtypes.NotSupported("use CreateStorageSpace with type personal")
}
// CreateStorageSpace creates a storage space
func (s *service) CreateStorageSpace(ctx context.Context, req *provider.CreateStorageSpaceRequest) (*provider.CreateStorageSpaceResponse, error) {
resp, err := s.storage.CreateStorageSpace(ctx, req)
func (s *Service) CreateStorageSpace(ctx context.Context, req *provider.CreateStorageSpaceRequest) (*provider.CreateStorageSpaceResponse, error) {
resp, err := s.Storage.CreateStorageSpace(ctx, req)
if err != nil {
var st *rpc.Status
switch err.(type) {
@@ -513,7 +513,7 @@ func (s *service) CreateStorageSpace(ctx context.Context, req *provider.CreateSt
case errtypes.NotSupported:
// if trying to create a user home fall back to CreateHome
if u, ok := ctxpkg.ContextGetUser(ctx); ok && req.Type == "personal" && utils.UserEqual(req.GetOwner().Id, u.Id) {
if err := s.storage.CreateHome(ctx); err != nil {
if err := s.Storage.CreateHome(ctx); err != nil {
st = status.NewInternal(ctx, "error creating home")
} else {
st = status.NewOK(ctx)
@@ -544,7 +544,7 @@ func (s *service) CreateStorageSpace(ctx context.Context, req *provider.CreateSt
return resp, nil
}
func (s *service) ListStorageSpaces(ctx context.Context, req *provider.ListStorageSpacesRequest) (*provider.ListStorageSpacesResponse, error) {
func (s *Service) ListStorageSpaces(ctx context.Context, req *provider.ListStorageSpacesRequest) (*provider.ListStorageSpacesResponse, error) {
log := appctx.GetLogger(ctx)
// TODO this is just temporary. Update the API to include this flag.
@@ -555,7 +555,7 @@ func (s *service) ListStorageSpaces(ctx context.Context, req *provider.ListStora
}
}
spaces, err := s.storage.ListStorageSpaces(ctx, req.Filters, unrestricted)
spaces, err := s.Storage.ListStorageSpaces(ctx, req.Filters, unrestricted)
if err != nil {
var st *rpc.Status
switch err.(type) {
@@ -593,8 +593,8 @@ func (s *service) ListStorageSpaces(ctx context.Context, req *provider.ListStora
}, nil
}
func (s *service) UpdateStorageSpace(ctx context.Context, req *provider.UpdateStorageSpaceRequest) (*provider.UpdateStorageSpaceResponse, error) {
res, err := s.storage.UpdateStorageSpace(ctx, req)
func (s *Service) UpdateStorageSpace(ctx context.Context, req *provider.UpdateStorageSpaceRequest) (*provider.UpdateStorageSpaceResponse, error) {
res, err := s.Storage.UpdateStorageSpace(ctx, req)
if err != nil {
appctx.GetLogger(ctx).
Error().
@@ -607,14 +607,14 @@ func (s *service) UpdateStorageSpace(ctx context.Context, req *provider.UpdateSt
return res, nil
}
func (s *service) DeleteStorageSpace(ctx context.Context, req *provider.DeleteStorageSpaceRequest) (*provider.DeleteStorageSpaceResponse, error) {
func (s *Service) DeleteStorageSpace(ctx context.Context, req *provider.DeleteStorageSpaceRequest) (*provider.DeleteStorageSpaceResponse, error) {
// we need to get the space before so we can return critical information
// FIXME: why is this string parsing necessary?
idraw, _ := storagespace.ParseID(req.Id.GetOpaqueId())
idraw.OpaqueId = idraw.GetSpaceId()
id := &provider.StorageSpaceId{OpaqueId: storagespace.FormatResourceID(idraw)}
spaces, err := s.storage.ListStorageSpaces(ctx, []*provider.ListStorageSpacesRequest_Filter{{Type: provider.ListStorageSpacesRequest_Filter_TYPE_ID, Term: &provider.ListStorageSpacesRequest_Filter_Id{Id: id}}}, true)
spaces, err := s.Storage.ListStorageSpaces(ctx, []*provider.ListStorageSpacesRequest_Filter{{Type: provider.ListStorageSpacesRequest_Filter_TYPE_ID, Term: &provider.ListStorageSpacesRequest_Filter_Id{Id: id}}}, true)
if err != nil {
var st *rpc.Status
switch err.(type) {
@@ -636,7 +636,7 @@ func (s *service) DeleteStorageSpace(ctx context.Context, req *provider.DeleteSt
}, nil
}
if err := s.storage.DeleteStorageSpace(ctx, req); err != nil {
if err := s.Storage.DeleteStorageSpace(ctx, req); err != nil {
var st *rpc.Status
switch err.(type) {
case errtypes.IsNotFound:
@@ -670,7 +670,7 @@ func (s *service) DeleteStorageSpace(ctx context.Context, req *provider.DeleteSt
return res, nil
}
func (s *service) CreateContainer(ctx context.Context, req *provider.CreateContainerRequest) (*provider.CreateContainerResponse, error) {
func (s *Service) CreateContainer(ctx context.Context, req *provider.CreateContainerRequest) (*provider.CreateContainerResponse, error) {
// FIXME these should be part of the CreateContainerRequest object
if req.Opaque != nil {
if e, ok := req.Opaque.Map["lockid"]; ok && e.Decoder == "plain" {
@@ -678,14 +678,14 @@ func (s *service) CreateContainer(ctx context.Context, req *provider.CreateConta
}
}
err := s.storage.CreateDir(ctx, req.Ref)
err := s.Storage.CreateDir(ctx, req.Ref)
return &provider.CreateContainerResponse{
Status: status.NewStatusFromErrType(ctx, "create container", err),
}, nil
}
func (s *service) TouchFile(ctx context.Context, req *provider.TouchFileRequest) (*provider.TouchFileResponse, error) {
func (s *Service) TouchFile(ctx context.Context, req *provider.TouchFileRequest) (*provider.TouchFileResponse, error) {
// FIXME these should be part of the TouchFileRequest object
var mtime string
if req.Opaque != nil {
@@ -695,14 +695,14 @@ func (s *service) TouchFile(ctx context.Context, req *provider.TouchFileRequest)
mtime = utils.ReadPlainFromOpaque(req.Opaque, "X-OC-Mtime")
}
err := s.storage.TouchFile(ctx, req.Ref, utils.ExistsInOpaque(req.Opaque, "markprocessing"), mtime)
err := s.Storage.TouchFile(ctx, req.Ref, utils.ExistsInOpaque(req.Opaque, "markprocessing"), mtime)
return &provider.TouchFileResponse{
Status: status.NewStatusFromErrType(ctx, "touch file", err),
}, nil
}
func (s *service) Delete(ctx context.Context, req *provider.DeleteRequest) (*provider.DeleteResponse, error) {
func (s *Service) Delete(ctx context.Context, req *provider.DeleteRequest) (*provider.DeleteResponse, error) {
if req.Ref.GetPath() == "/" {
return &provider.DeleteResponse{
Status: status.NewInternal(ctx, "can't delete mount path"),
@@ -720,7 +720,7 @@ func (s *service) Delete(ctx context.Context, req *provider.DeleteRequest) (*pro
}
}
md, err := s.storage.GetMD(ctx, req.Ref, []string{}, []string{"id", "status"})
md, err := s.Storage.GetMD(ctx, req.Ref, []string{}, []string{"id", "status"})
if err != nil {
return &provider.DeleteResponse{
Status: status.NewStatusFromErrType(ctx, "can't stat resource to delete", err),
@@ -741,7 +741,7 @@ func (s *service) Delete(ctx context.Context, req *provider.DeleteRequest) (*pro
}, nil
}
err = s.storage.Delete(ctx, req.Ref)
err = s.Storage.Delete(ctx, req.Ref)
return &provider.DeleteResponse{
Status: status.NewStatusFromErrType(ctx, "delete", err),
@@ -753,17 +753,17 @@ func (s *service) Delete(ctx context.Context, req *provider.DeleteRequest) (*pro
}, nil
}
func (s *service) Move(ctx context.Context, req *provider.MoveRequest) (*provider.MoveResponse, error) {
func (s *Service) Move(ctx context.Context, req *provider.MoveRequest) (*provider.MoveResponse, error) {
ctx = ctxpkg.ContextSetLockID(ctx, req.LockId)
err := s.storage.Move(ctx, req.Source, req.Destination)
err := s.Storage.Move(ctx, req.Source, req.Destination)
return &provider.MoveResponse{
Status: status.NewStatusFromErrType(ctx, "move", err),
}, nil
}
func (s *service) Stat(ctx context.Context, req *provider.StatRequest) (*provider.StatResponse, error) {
func (s *Service) Stat(ctx context.Context, req *provider.StatRequest) (*provider.StatResponse, error) {
ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "stat")
defer span.End()
@@ -772,7 +772,7 @@ func (s *service) Stat(ctx context.Context, req *provider.StatRequest) (*provide
Value: attribute.StringValue(req.GetRef().String()),
})
md, err := s.storage.GetMD(ctx, req.GetRef(), req.GetArbitraryMetadataKeys(), req.GetFieldMask().GetPaths())
md, err := s.Storage.GetMD(ctx, req.GetRef(), req.GetArbitraryMetadataKeys(), req.GetFieldMask().GetPaths())
if err != nil {
return &provider.StatResponse{
Status: status.NewStatusFromErrType(ctx, "stat", err),
@@ -789,11 +789,11 @@ func (s *service) Stat(ctx context.Context, req *provider.StatRequest) (*provide
}, nil
}
func (s *service) ListContainerStream(req *provider.ListContainerStreamRequest, ss provider.ProviderAPI_ListContainerStreamServer) error {
func (s *Service) ListContainerStream(req *provider.ListContainerStreamRequest, ss provider.ProviderAPI_ListContainerStreamServer) error {
ctx := ss.Context()
log := appctx.GetLogger(ctx)
mds, err := s.storage.ListFolder(ctx, req.GetRef(), req.GetArbitraryMetadataKeys(), req.GetFieldMask().GetPaths())
mds, err := s.Storage.ListFolder(ctx, req.GetRef(), req.GetArbitraryMetadataKeys(), req.GetFieldMask().GetPaths())
if err != nil {
var st *rpc.Status
switch err.(type) {
@@ -836,8 +836,8 @@ func (s *service) ListContainerStream(req *provider.ListContainerStreamRequest,
return nil
}
func (s *service) ListContainer(ctx context.Context, req *provider.ListContainerRequest) (*provider.ListContainerResponse, error) {
mds, err := s.storage.ListFolder(ctx, req.GetRef(), req.GetArbitraryMetadataKeys(), req.GetFieldMask().GetPaths())
func (s *Service) ListContainer(ctx context.Context, req *provider.ListContainerRequest) (*provider.ListContainerResponse, error) {
mds, err := s.Storage.ListFolder(ctx, req.GetRef(), req.GetArbitraryMetadataKeys(), req.GetFieldMask().GetPaths())
res := &provider.ListContainerResponse{
Status: status.NewStatusFromErrType(ctx, "list container", err),
Infos: mds,
@@ -854,8 +854,8 @@ func (s *service) ListContainer(ctx context.Context, req *provider.ListContainer
return res, nil
}
func (s *service) ListFileVersions(ctx context.Context, req *provider.ListFileVersionsRequest) (*provider.ListFileVersionsResponse, error) {
revs, err := s.storage.ListRevisions(ctx, req.Ref)
func (s *Service) ListFileVersions(ctx context.Context, req *provider.ListFileVersionsRequest) (*provider.ListFileVersionsResponse, error) {
revs, err := s.Storage.ListRevisions(ctx, req.Ref)
sort.Sort(descendingMtime(revs))
@@ -865,22 +865,22 @@ func (s *service) ListFileVersions(ctx context.Context, req *provider.ListFileVe
}, nil
}
func (s *service) RestoreFileVersion(ctx context.Context, req *provider.RestoreFileVersionRequest) (*provider.RestoreFileVersionResponse, error) {
func (s *Service) RestoreFileVersion(ctx context.Context, req *provider.RestoreFileVersionRequest) (*provider.RestoreFileVersionResponse, error) {
ctx = ctxpkg.ContextSetLockID(ctx, req.LockId)
err := s.storage.RestoreRevision(ctx, req.Ref, req.Key)
err := s.Storage.RestoreRevision(ctx, req.Ref, req.Key)
return &provider.RestoreFileVersionResponse{
Status: status.NewStatusFromErrType(ctx, "restore file version", err),
}, nil
}
func (s *service) ListRecycleStream(req *provider.ListRecycleStreamRequest, ss provider.ProviderAPI_ListRecycleStreamServer) error {
func (s *Service) ListRecycleStream(req *provider.ListRecycleStreamRequest, ss provider.ProviderAPI_ListRecycleStreamServer) error {
ctx := ss.Context()
log := appctx.GetLogger(ctx)
key, itemPath := router.ShiftPath(req.Key)
items, err := s.storage.ListRecycle(ctx, req.Ref, key, itemPath)
items, err := s.Storage.ListRecycle(ctx, req.Ref, key, itemPath)
if err != nil {
var st *rpc.Status
switch err.(type) {
@@ -922,9 +922,9 @@ func (s *service) ListRecycleStream(req *provider.ListRecycleStreamRequest, ss p
return nil
}
func (s *service) ListRecycle(ctx context.Context, req *provider.ListRecycleRequest) (*provider.ListRecycleResponse, error) {
func (s *Service) ListRecycle(ctx context.Context, req *provider.ListRecycleRequest) (*provider.ListRecycleResponse, error) {
key, itemPath := router.ShiftPath(req.Key)
items, err := s.storage.ListRecycle(ctx, req.Ref, key, itemPath)
items, err := s.Storage.ListRecycle(ctx, req.Ref, key, itemPath)
if err != nil {
var st *rpc.Status
switch err.(type) {
@@ -957,12 +957,12 @@ func (s *service) ListRecycle(ctx context.Context, req *provider.ListRecycleRequ
return res, nil
}
func (s *service) RestoreRecycleItem(ctx context.Context, req *provider.RestoreRecycleItemRequest) (*provider.RestoreRecycleItemResponse, error) {
func (s *Service) RestoreRecycleItem(ctx context.Context, req *provider.RestoreRecycleItemRequest) (*provider.RestoreRecycleItemResponse, error) {
ctx = ctxpkg.ContextSetLockID(ctx, req.LockId)
// TODO(labkode): CRITICAL: fill recycle info with storage provider.
key, itemPath := router.ShiftPath(req.Key)
err := s.storage.RestoreRecycleItem(ctx, req.Ref, key, itemPath, req.RestoreRef)
err := s.Storage.RestoreRecycleItem(ctx, req.Ref, key, itemPath, req.RestoreRef)
res := &provider.RestoreRecycleItemResponse{
Status: status.NewStatusFromErrType(ctx, "restore recycle item", err),
@@ -970,7 +970,7 @@ func (s *service) RestoreRecycleItem(ctx context.Context, req *provider.RestoreR
return res, nil
}
func (s *service) PurgeRecycle(ctx context.Context, req *provider.PurgeRecycleRequest) (*provider.PurgeRecycleResponse, error) {
func (s *Service) PurgeRecycle(ctx context.Context, req *provider.PurgeRecycleRequest) (*provider.PurgeRecycleResponse, error) {
// FIXME these should be part of the PurgeRecycleRequest object
if req.Opaque != nil {
if e, ok := req.Opaque.Map["lockid"]; ok && e.Decoder == "plain" {
@@ -981,7 +981,7 @@ func (s *service) PurgeRecycle(ctx context.Context, req *provider.PurgeRecycleRe
// if a key was sent as opaque id purge only that item
key, itemPath := router.ShiftPath(req.Key)
if key != "" {
if err := s.storage.PurgeRecycleItem(ctx, req.Ref, key, itemPath); err != nil {
if err := s.Storage.PurgeRecycleItem(ctx, req.Ref, key, itemPath); err != nil {
st := status.NewStatusFromErrType(ctx, "error purging recycle item", err)
appctx.GetLogger(ctx).
Error().
@@ -994,7 +994,7 @@ func (s *service) PurgeRecycle(ctx context.Context, req *provider.PurgeRecycleRe
Status: st,
}, nil
}
} else if err := s.storage.EmptyRecycle(ctx, req.Ref); err != nil {
} else if err := s.Storage.EmptyRecycle(ctx, req.Ref); err != nil {
// otherwise try emptying the whole recycle bin
st := status.NewStatusFromErrType(ctx, "error emptying recycle", err)
appctx.GetLogger(ctx).
@@ -1015,8 +1015,8 @@ func (s *service) PurgeRecycle(ctx context.Context, req *provider.PurgeRecycleRe
return res, nil
}
func (s *service) ListGrants(ctx context.Context, req *provider.ListGrantsRequest) (*provider.ListGrantsResponse, error) {
grants, err := s.storage.ListGrants(ctx, req.Ref)
func (s *Service) ListGrants(ctx context.Context, req *provider.ListGrantsRequest) (*provider.ListGrantsResponse, error) {
grants, err := s.Storage.ListGrants(ctx, req.Ref)
if err != nil {
var st *rpc.Status
switch err.(type) {
@@ -1045,7 +1045,7 @@ func (s *service) ListGrants(ctx context.Context, req *provider.ListGrantsReques
return res, nil
}
func (s *service) DenyGrant(ctx context.Context, req *provider.DenyGrantRequest) (*provider.DenyGrantResponse, error) {
func (s *Service) DenyGrant(ctx context.Context, req *provider.DenyGrantRequest) (*provider.DenyGrantResponse, error) {
// check grantee type is valid
if req.Grantee.Type == provider.GranteeType_GRANTEE_TYPE_INVALID {
return &provider.DenyGrantResponse{
@@ -1053,7 +1053,7 @@ func (s *service) DenyGrant(ctx context.Context, req *provider.DenyGrantRequest)
}, nil
}
err := s.storage.DenyGrant(ctx, req.Ref, req.Grantee)
err := s.Storage.DenyGrant(ctx, req.Ref, req.Grantee)
if err != nil {
var st *rpc.Status
switch err.(type) {
@@ -1086,7 +1086,7 @@ func (s *service) DenyGrant(ctx context.Context, req *provider.DenyGrantRequest)
return res, nil
}
func (s *service) AddGrant(ctx context.Context, req *provider.AddGrantRequest) (*provider.AddGrantResponse, error) {
func (s *Service) AddGrant(ctx context.Context, req *provider.AddGrantRequest) (*provider.AddGrantResponse, error) {
ctx = ctxpkg.ContextSetLockID(ctx, req.LockId)
// TODO: update CS3 APIs
@@ -1109,14 +1109,14 @@ func (s *service) AddGrant(ctx context.Context, req *provider.AddGrantRequest) (
}, nil
}
err := s.storage.AddGrant(ctx, req.Ref, req.Grant)
err := s.Storage.AddGrant(ctx, req.Ref, req.Grant)
return &provider.AddGrantResponse{
Status: status.NewStatusFromErrType(ctx, "add grant", err),
}, nil
}
func (s *service) UpdateGrant(ctx context.Context, req *provider.UpdateGrantRequest) (*provider.UpdateGrantResponse, error) {
func (s *Service) UpdateGrant(ctx context.Context, req *provider.UpdateGrantRequest) (*provider.UpdateGrantResponse, error) {
// FIXME these should be part of the UpdateGrantRequest object
if req.Opaque != nil {
if e, ok := req.Opaque.Map["lockid"]; ok && e.Decoder == "plain" {
@@ -1144,14 +1144,14 @@ func (s *service) UpdateGrant(ctx context.Context, req *provider.UpdateGrantRequ
}, nil
}
err := s.storage.UpdateGrant(ctx, req.Ref, req.Grant)
err := s.Storage.UpdateGrant(ctx, req.Ref, req.Grant)
return &provider.UpdateGrantResponse{
Status: status.NewStatusFromErrType(ctx, "update grant", err),
}, nil
}
func (s *service) RemoveGrant(ctx context.Context, req *provider.RemoveGrantRequest) (*provider.RemoveGrantResponse, error) {
func (s *Service) RemoveGrant(ctx context.Context, req *provider.RemoveGrantRequest) (*provider.RemoveGrantResponse, error) {
ctx = ctxpkg.ContextSetLockID(ctx, req.LockId)
// check targetType is valid
@@ -1168,14 +1168,14 @@ func (s *service) RemoveGrant(ctx context.Context, req *provider.RemoveGrantRequ
ctx = context.WithValue(ctx, utils.SpaceGrant, struct{}{})
}
err := s.storage.RemoveGrant(ctx, req.Ref, req.Grant)
err := s.Storage.RemoveGrant(ctx, req.Ref, req.Grant)
return &provider.RemoveGrantResponse{
Status: status.NewStatusFromErrType(ctx, "remove grant", err),
}, nil
}
func (s *service) CreateReference(ctx context.Context, req *provider.CreateReferenceRequest) (*provider.CreateReferenceResponse, error) {
func (s *Service) CreateReference(ctx context.Context, req *provider.CreateReferenceRequest) (*provider.CreateReferenceResponse, error) {
log := appctx.GetLogger(ctx)
// parse uri is valid
@@ -1187,7 +1187,7 @@ func (s *service) CreateReference(ctx context.Context, req *provider.CreateRefer
}, nil
}
if err := s.storage.CreateReference(ctx, req.Ref.GetPath(), u); err != nil {
if err := s.Storage.CreateReference(ctx, req.Ref.GetPath(), u); err != nil {
var st *rpc.Status
switch err.(type) {
case errtypes.IsNotFound:
@@ -1212,14 +1212,14 @@ func (s *service) CreateReference(ctx context.Context, req *provider.CreateRefer
}, nil
}
func (s *service) CreateSymlink(ctx context.Context, req *provider.CreateSymlinkRequest) (*provider.CreateSymlinkResponse, error) {
func (s *Service) CreateSymlink(ctx context.Context, req *provider.CreateSymlinkRequest) (*provider.CreateSymlinkResponse, error) {
return &provider.CreateSymlinkResponse{
Status: status.NewUnimplemented(ctx, errtypes.NotSupported("CreateSymlink not implemented"), "CreateSymlink not implemented"),
}, nil
}
func (s *service) GetQuota(ctx context.Context, req *provider.GetQuotaRequest) (*provider.GetQuotaResponse, error) {
total, used, remaining, err := s.storage.GetQuota(ctx, req.Ref)
func (s *Service) GetQuota(ctx context.Context, req *provider.GetQuotaRequest) (*provider.GetQuotaResponse, error) {
total, used, remaining, err := s.Storage.GetQuota(ctx, req.Ref)
if err != nil {
var st *rpc.Status
switch err.(type) {
@@ -1257,7 +1257,7 @@ func (s *service) GetQuota(ctx context.Context, req *provider.GetQuotaRequest) (
return res, nil
}
func (s *service) addMissingStorageProviderID(resourceID *provider.ResourceId, spaceID *provider.StorageSpaceId) {
func (s *Service) addMissingStorageProviderID(resourceID *provider.ResourceId, spaceID *provider.StorageSpaceId) {
// The storage driver might set the mount ID by itself, in which case skip this step
if resourceID != nil && resourceID.GetStorageId() == "" {
resourceID.StorageId = s.conf.MountID
@@ -56,11 +56,12 @@ func init() {
// Config holds the config options for the HTTP appprovider service
type Config struct {
Prefix string `mapstructure:"prefix"`
GatewaySvc string `mapstructure:"gatewaysvc"`
Insecure bool `mapstructure:"insecure"`
WebBaseURI string `mapstructure:"webbaseuri"`
Web Web `mapstructure:"web"`
Prefix string `mapstructure:"prefix"`
GatewaySvc string `mapstructure:"gatewaysvc"`
Insecure bool `mapstructure:"insecure"`
WebBaseURI string `mapstructure:"webbaseuri"`
Web Web `mapstructure:"web"`
SecureViewApp string `mapstructure:"secure_view_app"`
}
// Web holds the config options for the URL parameters for Web
@@ -342,6 +343,16 @@ func (s *svc) handleList(w http.ResponseWriter, r *http.Request) {
}
res := filterAppsByUserAgent(listRes.MimeTypes, r.UserAgent())
// if app name or address matches the configured secure view app add that flag to the response
for _, mt := range res {
for _, app := range mt.AppProviders {
if app.Name == s.conf.SecureViewApp {
app.SecureView = true
}
}
}
js, err := json.Marshal(map[string]interface{}{"mime-types": res})
if err != nil {
writeError(w, r, appErrorServerError, "error marshalling JSON response", err)
@@ -545,22 +556,38 @@ func newOpenInWebResponse(baseURI string, params, staticParams map[string]string
return openInWebResponse{URI: uri.String()}, nil
}
func filterAppsByUserAgent(mimeTypes []*appregistry.MimeTypeInfo, userAgent string) []*appregistry.MimeTypeInfo {
// MimeTypeInfo wraps the appregistry.MimeTypeInfo to change the app providers to ProviderInfos with a secure view flag
type MimeTypeInfo struct {
appregistry.MimeTypeInfo
AppProviders []*ProviderInfo `json:"app_providers"`
}
// ProviderInfo wraps the appregistry.ProviderInfo to add a secure view flag
type ProviderInfo struct {
appregistry.ProviderInfo
// TODO make this part of the CS3 provider info
SecureView bool `json:"secure_view"`
}
// filterAppsByUserAgent rewrites the mime type info to only include apps that can be called by the user agent
// it also wraps the provider info to be able to add a secure view flag
func filterAppsByUserAgent(mimeTypes []*appregistry.MimeTypeInfo, userAgent string) []*MimeTypeInfo {
ua := ua.Parse(userAgent)
res := []*appregistry.MimeTypeInfo{}
res := []*MimeTypeInfo{}
for _, m := range mimeTypes {
apps := []*appregistry.ProviderInfo{}
apps := []*ProviderInfo{}
for _, p := range m.AppProviders {
p.Address = "" // address is internal only and not needed in the client
// apps are called by name, so if it has no name it cannot be called and should not be advertised
// also filter Desktop-only apps if ua is not Desktop
if p.Name != "" && (ua.Desktop || !p.DesktopOnly) {
apps = append(apps, p)
apps = append(apps, &ProviderInfo{ProviderInfo: *p})
}
}
if len(apps) > 0 {
m.AppProviders = apps
res = append(res, m)
mt := &MimeTypeInfo{MimeTypeInfo: *m}
mt.AppProviders = apps
res = append(res, mt)
}
}
return res
+1 -7
View File
@@ -82,7 +82,7 @@ func New(m map[string]interface{}, publisher events.Publisher) (datatx.DataTX, e
}
func (m *manager) Handler(fs storage.FS) (http.Handler, error) {
composable, ok := fs.(composable)
composable, ok := fs.(storage.ComposableFS)
if !ok {
return nil, errtypes.NotSupported("file system does not support the tus protocol")
}
@@ -193,12 +193,6 @@ func (m *manager) Handler(fs storage.FS) (http.Handler, error) {
return h, nil
}
// Composable is the interface that a struct needs to implement
// to be composable, so that it can support the TUS methods
type composable interface {
UseIn(composer *tusd.StoreComposer)
}
func setHeaders(fs storage.FS, w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
id := path.Base(r.URL.Path)
-1
View File
@@ -31,7 +31,6 @@ import (
_ "github.com/cs3org/reva/v2/pkg/storage/fs/nextcloud"
_ "github.com/cs3org/reva/v2/pkg/storage/fs/ocis"
_ "github.com/cs3org/reva/v2/pkg/storage/fs/owncloudsql"
_ "github.com/cs3org/reva/v2/pkg/storage/fs/posix"
_ "github.com/cs3org/reva/v2/pkg/storage/fs/s3"
_ "github.com/cs3org/reva/v2/pkg/storage/fs/s3ng"
// Add your own here
+25
View File
@@ -0,0 +1,25 @@
// Copyright 2018-2024 CERN
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// In applying this license, CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.
package loader
import (
// Load core storage filesystem backends.
_ "github.com/cs3org/reva/v2/pkg/storage/fs/posix"
// Add your own here
)
@@ -20,14 +20,10 @@ package blobstore
import (
"bufio"
"fmt"
"io"
"os"
"path/filepath"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/lookup"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node"
"github.com/cs3org/reva/v2/pkg/utils"
"github.com/pkg/errors"
)
@@ -38,11 +34,6 @@ type Blobstore struct {
// New returns a new Blobstore
func New(root string) (*Blobstore, error) {
err := os.MkdirAll(root, 0700)
if err != nil {
return nil, err
}
return &Blobstore{
root: root,
}, nil
@@ -50,35 +41,21 @@ func New(root string) (*Blobstore, error) {
// Upload stores some data in the blobstore under the given key
func (bs *Blobstore) Upload(node *node.Node, source string) error {
dest, err := bs.path(node)
if err != nil {
return err
}
// ensure parent path exists
if err := os.MkdirAll(filepath.Dir(dest), 0700); err != nil {
return errors.Wrap(err, "Decomposedfs: oCIS blobstore: error creating parent folders for blob")
}
if err := os.Rename(source, dest); err == nil {
return nil
}
// Rename failed, file needs to be copied.
file, err := os.Open(source)
if err != nil {
return errors.Wrap(err, "Decomposedfs: oCIS blobstore: Can not open source file to upload")
}
defer file.Close()
f, err := os.OpenFile(dest, os.O_CREATE|os.O_WRONLY, 0700)
f, err := os.OpenFile(node.InternalPath(), os.O_CREATE|os.O_WRONLY, 0700)
if err != nil {
return errors.Wrapf(err, "could not open blob '%s' for writing", dest)
return errors.Wrapf(err, "could not open blob '%s' for writing", node.InternalPath())
}
w := bufio.NewWriter(f)
_, err = w.ReadFrom(file)
if err != nil {
return errors.Wrapf(err, "could not write blob '%s'", dest)
return errors.Wrapf(err, "could not write blob '%s'", node.InternalPath())
}
return w.Flush()
@@ -86,37 +63,14 @@ func (bs *Blobstore) Upload(node *node.Node, source string) error {
// Download retrieves a blob from the blobstore for reading
func (bs *Blobstore) Download(node *node.Node) (io.ReadCloser, error) {
dest, err := bs.path(node)
file, err := os.Open(node.InternalPath())
if err != nil {
return nil, err
}
file, err := os.Open(dest)
if err != nil {
return nil, errors.Wrapf(err, "could not read blob '%s'", dest)
return nil, errors.Wrapf(err, "could not read blob '%s'", node.InternalPath())
}
return file, nil
}
// Delete deletes a blob from the blobstore
func (bs *Blobstore) Delete(node *node.Node) error {
dest, err := bs.path(node)
if err != nil {
return err
}
if err := utils.RemoveItem(dest); err != nil {
return errors.Wrapf(err, "could not delete blob '%s'", dest)
}
return nil
}
func (bs *Blobstore) path(node *node.Node) (string, error) {
if node.BlobID == "" {
return "", fmt.Errorf("blobstore: BlobID is empty")
}
return filepath.Join(
bs.root,
filepath.Clean(filepath.Join(
"/", "spaces", lookup.Pathify(node.SpaceID, 1, 2), "blobs", lookup.Pathify(node.BlobID, 4, 2)),
),
), nil
}
+129 -40
View File
@@ -24,16 +24,20 @@ import (
"os"
"path/filepath"
"strings"
"syscall"
user "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"github.com/cs3org/reva/v2/pkg/appctx"
"github.com/cs3org/reva/v2/pkg/errtypes"
"github.com/cs3org/reva/v2/pkg/storage/fs/posix/options"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata/prefixes"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/options"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/usermapper"
"github.com/cs3org/reva/v2/pkg/storage/utils/templates"
"github.com/cs3org/reva/v2/pkg/storagespace"
"github.com/google/uuid"
"github.com/pkg/errors"
"github.com/rogpeppe/go-internal/lockedfile"
"go.opentelemetry.io/otel"
@@ -43,24 +47,127 @@ import (
var tracer trace.Tracer
var _spaceTypePersonal = "personal"
var _spaceTypeProject = "project"
func init() {
tracer = otel.Tracer("github.com/cs3org/reva/pkg/storage/utils/decomposedfs/lookup")
}
// IDCache is a cache for node ids
type IDCache interface {
Get(ctx context.Context, spaceID, nodeID string) (string, bool)
Set(ctx context.Context, spaceID, nodeID, val string) error
}
// Lookup implements transformations from filepath to node and back
type Lookup struct {
Options *options.Options
IDCache IDCache
metadataBackend metadata.Backend
userMapper usermapper.Mapper
}
// New returns a new Lookup instance
func New(b metadata.Backend, o *options.Options) *Lookup {
return &Lookup{
func New(b metadata.Backend, um usermapper.Mapper, o *options.Options) *Lookup {
lu := &Lookup{
Options: o,
metadataBackend: b,
IDCache: NewStoreIDCache(&o.Options),
userMapper: um,
}
go func() {
_ = lu.WarmupIDCache(o.Root)
}()
return lu
}
// CacheID caches the id for the given space and node id
func (lu *Lookup) CacheID(ctx context.Context, spaceID, nodeID, val string) error {
return lu.IDCache.Set(ctx, spaceID, nodeID, val)
}
// GetCachedID returns the cached id for the given space and node id
func (lu *Lookup) GetCachedID(ctx context.Context, spaceID, nodeID string) (string, bool) {
return lu.IDCache.Get(ctx, spaceID, nodeID)
}
// WarmupIDCache warms up the id cache
func (lu *Lookup) WarmupIDCache(root string) error {
spaceID := []byte("")
var gid int
return filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
attribs, err := lu.metadataBackend.All(context.Background(), path)
if err == nil {
nodeSpaceID, ok := attribs[prefixes.SpaceIDAttr]
if ok {
spaceID = nodeSpaceID
// set the uid and gid for the space
fi, err := os.Stat(path)
if err != nil {
return err
}
sys := fi.Sys().(*syscall.Stat_t)
gid = int(sys.Gid)
_, err = lu.userMapper.ScopeUserByIds(-1, gid)
if err != nil {
return err
}
}
if len(spaceID) == 0 {
// try to find space
spaceCandidate := path
for strings.HasPrefix(spaceCandidate, lu.Options.Root) {
spaceID, err = lu.MetadataBackend().Get(context.Background(), spaceCandidate, prefixes.SpaceIDAttr)
if err == nil {
if lu.Options.UseSpaceGroups {
// set the uid and gid for the space
fi, err := os.Stat(spaceCandidate)
if err != nil {
return err
}
sys := fi.Sys().(*syscall.Stat_t)
gid := int(sys.Gid)
_, err = lu.userMapper.ScopeUserByIds(-1, gid)
if err != nil {
return err
}
}
break
}
spaceCandidate = filepath.Dir(spaceCandidate)
}
}
id, ok := attribs[prefixes.IDAttr]
if ok && len(spaceID) > 0 {
_ = lu.IDCache.Set(context.Background(), string(spaceID), string(id), path)
}
}
return nil
})
}
// NodeFromPath returns the node for the given path
func (lu *Lookup) NodeIDFromParentAndName(ctx context.Context, parent *node.Node, name string) (string, error) {
id, err := lu.metadataBackend.Get(ctx, filepath.Join(parent.InternalPath(), name), prefixes.IDAttr)
if err != nil {
if metadata.IsNotExist(err) {
return "", errtypes.NotFound(name)
}
return "", err
}
return string(id), nil
}
// MetadataBackend returns the metadata backend
@@ -118,17 +225,6 @@ func (lu *Lookup) TypeFromPath(ctx context.Context, path string) provider.Resour
return t
}
func (lu *Lookup) NodeIDFromParentAndName(ctx context.Context, parent *node.Node, name string) (string, error) {
id, err := lu.metadataBackend.Get(ctx, filepath.Join(parent.InternalPath(), name), prefixes.IDAttr)
if err != nil {
if metadata.IsNotExist(err) {
return "", errtypes.NotFound(name)
}
return "", err
}
return string(id), nil
}
// NodeFromResource takes in a request path or request id and converts it to a Node
func (lu *Lookup) NodeFromResource(ctx context.Context, ref *provider.Reference) (*node.Node, error) {
ctx, span := tracer.Start(ctx, "NodeFromResource")
@@ -272,11 +368,9 @@ func (lu *Lookup) InternalRoot() string {
// InternalPath returns the internal path for a given ID
func (lu *Lookup) InternalPath(spaceID, nodeID string) string {
return filepath.Join(lu.Options.Root, "spaces", Pathify(spaceID, 1, 2), "nodes", Pathify(nodeID, 4, 2))
}
path, _ := lu.IDCache.Get(context.Background(), spaceID, nodeID)
func (lu *Lookup) SpacePath(spaceID string) string {
return filepath.Join(lu.Options.Root, spaceID)
return path
}
// // ReferenceFromAttr returns a CS3 reference from xattr of a node.
@@ -358,30 +452,25 @@ func (lu *Lookup) CopyMetadataWithSourceLock(ctx context.Context, sourcePath, ta
// GenerateSpaceID generates a space id for the given space type and owner
func (lu *Lookup) GenerateSpaceID(spaceType string, owner *user.User) (string, error) {
switch spaceType {
case _spaceTypeProject:
return uuid.New().String(), nil
case _spaceTypePersonal:
return templates.WithUser(owner, lu.Options.UserLayout), nil
path := templates.WithUser(owner, lu.Options.UserLayout)
spaceID, err := lu.metadataBackend.Get(context.Background(), filepath.Join(lu.Options.Root, path), prefixes.IDAttr)
if err != nil {
if metadata.IsNotExist(err) || metadata.IsAttrUnset(err) {
return uuid.New().String(), nil
} else {
return "", err
}
}
resID, err := storagespace.ParseID(string(spaceID))
if err != nil {
return "", err
}
return resID.SpaceId, nil
default:
return "", fmt.Errorf("unsupported space type: %s", spaceType)
}
}
// DetectBackendOnDisk returns the name of the metadata backend being used on disk
func DetectBackendOnDisk(root string) string {
matches, _ := filepath.Glob(filepath.Join(root, "spaces", "*", "*"))
if len(matches) > 0 {
base := matches[len(matches)-1]
spaceid := strings.ReplaceAll(
strings.TrimPrefix(base, filepath.Join(root, "spaces")),
"/", "")
spaceRoot := Pathify(spaceid, 4, 2)
_, err := os.Stat(filepath.Join(base, "nodes", spaceRoot+".mpk"))
if err == nil {
return "mpk"
}
_, err = os.Stat(filepath.Join(base, "nodes", spaceRoot+".ini"))
if err == nil {
return "ini"
}
}
return "xattrs"
}
@@ -0,0 +1,69 @@
// Copyright 2018-2021 CERN
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// In applying this license, CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.
package lookup
import (
"context"
microstore "go-micro.dev/v4/store"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/options"
"github.com/cs3org/reva/v2/pkg/store"
)
type StoreIDCache struct {
cache microstore.Store
}
// NewMemoryIDCache returns a new MemoryIDCache
func NewStoreIDCache(o *options.Options) *StoreIDCache {
return &StoreIDCache{
cache: store.Create(
store.Store(o.IDCache.Store),
store.TTL(o.IDCache.TTL),
store.Size(o.IDCache.Size),
microstore.Nodes(o.IDCache.Nodes...),
microstore.Database(o.IDCache.Database),
microstore.Table(o.IDCache.Table),
store.DisablePersistence(o.IDCache.DisablePersistence),
store.Authentication(o.IDCache.AuthUsername, o.IDCache.AuthPassword),
),
}
}
// Add adds a new entry to the cache
func (c *StoreIDCache) Set(_ context.Context, spaceID, nodeID, val string) error {
return c.cache.Write(&microstore.Record{
Key: cacheKey(spaceID, nodeID),
Value: []byte(val),
})
}
// Get returns the value for a given key
func (c *StoreIDCache) Get(_ context.Context, spaceID, nodeID string) (string, bool) {
records, err := c.cache.Read(cacheKey(spaceID, nodeID))
if err != nil {
return "", false
}
return string(records[0].Value), true
}
func cacheKey(spaceid, nodeID string) string {
return spaceid + "!" + nodeID
}
@@ -0,0 +1,52 @@
// Copyright 2018-2024 CERN
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// In applying this license, CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.
package options
import (
decomposedoptions "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/options"
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
)
type Options struct {
decomposedoptions.Options
UseSpaceGroups bool `mapstructure:"use_space_groups"`
WatchType string `mapstructure:"watch_type"`
WatchPath string `mapstructure:"watch_path"`
WatchFolderKafkaBrokers string `mapstructure:"watch_folder_kafka_brokers"`
}
// New returns a new Options instance for the given configuration
func New(m map[string]interface{}) (*Options, error) {
o := &Options{}
if err := mapstructure.Decode(m, o); err != nil {
err = errors.Wrap(err, "error decoding conf")
return nil, err
}
do, err := decomposedoptions.New(m)
if err != nil {
return nil, err
}
o.Options = *do
return o, nil
}
+115 -11
View File
@@ -1,3 +1,6 @@
//go:build linux
// +build linux
// Copyright 2018-2021 CERN
//
// Licensed under the Apache License, Version 2.0 (the "License");
@@ -19,9 +22,12 @@
package posix
import (
"context"
"fmt"
"path"
"os"
"syscall"
tusd "github.com/tus/tusd/pkg/handler"
microstore "go-micro.dev/v4/store"
"github.com/cs3org/reva/v2/pkg/events"
@@ -29,21 +35,31 @@ import (
"github.com/cs3org/reva/v2/pkg/storage"
"github.com/cs3org/reva/v2/pkg/storage/fs/posix/blobstore"
"github.com/cs3org/reva/v2/pkg/storage/fs/posix/lookup"
"github.com/cs3org/reva/v2/pkg/storage/fs/posix/options"
"github.com/cs3org/reva/v2/pkg/storage/fs/posix/tree"
"github.com/cs3org/reva/v2/pkg/storage/fs/registry"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/aspects"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/options"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/permissions"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/upload"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/usermapper"
"github.com/cs3org/reva/v2/pkg/storage/utils/middleware"
"github.com/cs3org/reva/v2/pkg/store"
"github.com/pkg/errors"
)
func init() {
registry.Register("posix", New)
}
type posixFS struct {
storage.FS
um usermapper.Mapper
}
// New returns an implementation to of the storage.FS interface that talk to
// a local filesystem.
func New(m map[string]interface{}, stream events.Stream) (storage.FS, error) {
@@ -52,22 +68,24 @@ func New(m map[string]interface{}, stream events.Stream) (storage.FS, error) {
return nil, err
}
bs, err := blobstore.New(path.Join(o.Root))
bs, err := blobstore.New(o.Root)
if err != nil {
return nil, err
}
um := usermapper.NewUnixMapper()
var lu *lookup.Lookup
switch o.MetadataBackend {
case "xattrs":
lu = lookup.New(metadata.XattrsBackend{}, o)
lu = lookup.New(metadata.XattrsBackend{}, um, o)
case "messagepack":
lu = lookup.New(metadata.NewMessagePackBackend(o.Root, o.FileMetadataCache), o)
lu = lookup.New(metadata.NewMessagePackBackend(o.Root, o.FileMetadataCache), um, o)
default:
return nil, fmt.Errorf("unknown metadata backend %s, only 'messagepack' or 'xattrs' (default) supported", o.MetadataBackend)
}
tp := tree.New(lu, bs, o, store.Create(
tp, err := tree.New(lu, bs, um, o, store.Create(
store.Store(o.IDCache.Store),
store.TTL(o.IDCache.TTL),
store.Size(o.IDCache.Size),
@@ -77,6 +95,9 @@ func New(m map[string]interface{}, stream events.Stream) (storage.FS, error) {
store.DisablePersistence(o.IDCache.DisablePersistence),
store.Authentication(o.IDCache.AuthUsername, o.IDCache.AuthPassword),
))
if err != nil {
return nil, err
}
permissionsSelector, err := pool.PermissionsSelector(o.PermissionsSVC, pool.WithTLSMode(o.PermTLSMode))
if err != nil {
@@ -86,15 +107,98 @@ func New(m map[string]interface{}, stream events.Stream) (storage.FS, error) {
p := permissions.NewPermissions(node.NewPermissions(lu), permissionsSelector)
aspects := aspects.Aspects{
Lookup: lu,
Tree: tp,
Permissions: p,
EventStream: stream,
Lookup: lu,
Tree: tp,
Permissions: p,
EventStream: stream,
UserMapper: um,
DisableVersioning: true,
}
fs, err := decomposedfs.New(o, aspects)
dfs, err := decomposedfs.New(&o.Options, aspects)
if err != nil {
return nil, err
}
hooks := []middleware.Hook{}
if o.UseSpaceGroups {
resolveSpaceHook := func(methodName string, ctx context.Context, spaceID string) (context.Context, middleware.UnHook, error) {
if spaceID == "" {
return ctx, nil, nil
}
spaceRoot := lu.InternalPath(spaceID, spaceID)
fi, err := os.Stat(spaceRoot)
if err != nil {
return ctx, nil, err
}
ctx = context.WithValue(ctx, decomposedfs.CtxKeySpaceGID, fi.Sys().(*syscall.Stat_t).Gid)
return ctx, nil, err
}
scopeSpaceGroupHook := func(methodName string, ctx context.Context, spaceID string) (context.Context, middleware.UnHook, error) {
spaceGID, ok := ctx.Value(decomposedfs.CtxKeySpaceGID).(uint32)
if !ok {
return ctx, nil, nil
}
unscope, err := um.ScopeUserByIds(-1, int(spaceGID))
if err != nil {
return ctx, nil, errors.Wrap(err, "failed to scope user")
}
return ctx, unscope, nil
}
hooks = append(hooks, resolveSpaceHook, scopeSpaceGroupHook)
}
mw := middleware.NewFS(dfs, hooks...)
fs := &posixFS{
FS: mw,
um: um,
}
return fs, nil
}
// ListUploadSessions returns the upload sessions matching the given filter
func (fs *posixFS) ListUploadSessions(ctx context.Context, filter storage.UploadSessionFilter) ([]storage.UploadSession, error) {
return fs.FS.(storage.UploadSessionLister).ListUploadSessions(ctx, filter)
}
// UseIn tells the tus upload middleware which extensions it supports.
func (fs *posixFS) UseIn(composer *tusd.StoreComposer) {
fs.FS.(storage.ComposableFS).UseIn(composer)
}
// NewUpload returns a new tus Upload instance
func (fs *posixFS) NewUpload(ctx context.Context, info tusd.FileInfo) (upload tusd.Upload, err error) {
return fs.FS.(tusd.DataStore).NewUpload(ctx, info)
}
// NewUpload returns a new tus Upload instance
func (fs *posixFS) GetUpload(ctx context.Context, id string) (upload tusd.Upload, err error) {
return fs.FS.(tusd.DataStore).GetUpload(ctx, id)
}
// AsTerminatableUpload returns a TerminatableUpload
// To implement the termination extension as specified in https://tus.io/protocols/resumable-upload.html#termination
// the storage needs to implement AsTerminatableUpload
func (fs *posixFS) AsTerminatableUpload(up tusd.Upload) tusd.TerminatableUpload {
return up.(*upload.OcisSession)
}
// AsLengthDeclarableUpload returns a LengthDeclarableUpload
// To implement the creation-defer-length extension as specified in https://tus.io/protocols/resumable-upload.html#creation
// the storage needs to implement AsLengthDeclarableUpload
func (fs *posixFS) AsLengthDeclarableUpload(up tusd.Upload) tusd.LengthDeclarableUpload {
return up.(*upload.OcisSession)
}
// AsConcatableUpload returns a ConcatableUpload
// To implement the concatenation extension as specified in https://tus.io/protocols/resumable-upload.html#concatenation
// the storage needs to implement AsConcatableUpload
func (fs *posixFS) AsConcatableUpload(up tusd.Upload) tusd.ConcatableUpload {
return up.(*upload.OcisSession)
}
@@ -0,0 +1,277 @@
// Copyright 2018-2021 CERN
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// In applying this license, CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.
package tree
import (
"context"
"fmt"
"io/fs"
"os"
"path/filepath"
"strings"
"sync"
"syscall"
"time"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"github.com/cs3org/reva/v2/pkg/storage/fs/posix/lookup"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata/prefixes"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node"
"github.com/google/uuid"
"github.com/pkg/errors"
"github.com/rs/zerolog/log"
)
type ScanDebouncer struct {
after time.Duration
f func(item scanItem)
pending map[string]*time.Timer
inProgress sync.Map
mutex sync.Mutex
}
// NewScanDebouncer returns a new SpaceDebouncer instance
func NewScanDebouncer(d time.Duration, f func(item scanItem)) *ScanDebouncer {
return &ScanDebouncer{
after: d,
f: f,
pending: map[string]*time.Timer{},
inProgress: sync.Map{},
}
}
// Debounce restars the debounce timer for the given space
func (d *ScanDebouncer) Debounce(item scanItem) {
d.mutex.Lock()
defer d.mutex.Unlock()
path := item.Path
force := item.ForceRescan
if t := d.pending[item.Path]; t != nil {
force = force || item.ForceRescan
t.Stop()
}
d.pending[item.Path] = time.AfterFunc(d.after, func() {
if _, ok := d.inProgress.Load(path); ok {
// Reschedule this run for when the previous run has finished
d.mutex.Lock()
d.pending[path].Reset(d.after)
d.mutex.Unlock()
return
}
d.inProgress.Store(path, true)
defer d.inProgress.Delete(path)
d.f(scanItem{
Path: path,
ForceRescan: force,
})
})
}
func (t *Tree) workScanQueue() {
for i := 0; i < t.options.MaxConcurrency; i++ {
go func() {
for {
item := <-t.scanQueue
err := t.assimilate(item)
if err != nil {
log.Error().Err(err).Str("path", item.Path).Msg("failed to assimilate item")
continue
}
}
}()
}
}
// Scan scans the given path and updates the id chache
func (t *Tree) Scan(path string, forceRescan bool) error {
t.scanDebouncer.Debounce(scanItem{
Path: path,
ForceRescan: forceRescan,
})
return nil
}
func (t *Tree) assimilate(item scanItem) error {
var err error
// find the space id, scope by the according user
spaceID := []byte("")
spaceCandidate := item.Path
for strings.HasPrefix(spaceCandidate, t.options.Root) {
spaceID, err = t.lookup.MetadataBackend().Get(context.Background(), spaceCandidate, prefixes.SpaceIDAttr)
if err == nil {
if t.options.UseSpaceGroups {
// set the uid and gid for the space
fi, err := os.Stat(spaceCandidate)
if err != nil {
return err
}
sys := fi.Sys().(*syscall.Stat_t)
gid := int(sys.Gid)
_, err = t.userMapper.ScopeUserByIds(-1, gid)
if err != nil {
return err
}
}
break
}
spaceCandidate = filepath.Dir(spaceCandidate)
}
if len(spaceID) == 0 {
return fmt.Errorf("did not find space id for path")
}
var id []byte
if !item.ForceRescan {
// already assimilated?
id, err := t.lookup.MetadataBackend().Get(context.Background(), item.Path, prefixes.IDAttr)
if err == nil {
_ = t.lookup.(*lookup.Lookup).CacheID(context.Background(), string(spaceID), string(id), item.Path)
return nil
}
}
// lock the file for assimilation
unlock, err := t.lookup.MetadataBackend().Lock(item.Path)
if err != nil {
return errors.Wrap(err, "failed to lock item for assimilation")
}
defer func() {
_ = unlock()
}()
// check for the id attribute again after grabbing the lock, maybe the file was assimilated/created by us in the meantime
id, err = t.lookup.MetadataBackend().Get(context.Background(), item.Path, prefixes.IDAttr)
if err == nil {
_ = t.lookup.(*lookup.Lookup).CacheID(context.Background(), string(spaceID), string(id), item.Path)
if item.ForceRescan {
_, err = t.updateFile(item.Path, string(id), string(spaceID))
if err != nil {
return err
}
}
} else {
// assimilate new file
newId := uuid.New().String()
_, err = t.updateFile(item.Path, newId, string(spaceID))
if err != nil {
return err
}
}
return nil
}
func (t *Tree) updateFile(path, id, spaceID string) (fs.FileInfo, error) {
retries := 1
parentID := ""
assimilate:
if id != spaceID {
// read parent
parentAttribs, err := t.lookup.MetadataBackend().All(context.Background(), filepath.Dir(path))
if err != nil {
return nil, fmt.Errorf("failed to read parent item attributes")
}
if len(parentAttribs) == 0 || len(parentAttribs[prefixes.IDAttr]) == 0 {
if retries == 0 {
return nil, fmt.Errorf("got empty parent attribs even after assimilating")
}
// assimilate parent first
err = t.assimilate(scanItem{Path: filepath.Dir(path), ForceRescan: false})
if err != nil {
return nil, err
}
// retry
retries--
goto assimilate
}
parentID = string(parentAttribs[prefixes.IDAttr])
}
// assimilate file
fi, err := os.Stat(path)
if err != nil {
return nil, errors.Wrap(err, "failed to stat item")
}
previousAttribs, err := t.lookup.MetadataBackend().All(context.Background(), path)
if err != nil && !metadata.IsAttrUnset(err) {
return nil, errors.Wrap(err, "failed to get item attribs")
}
attributes := node.Attributes{
prefixes.IDAttr: []byte(id),
prefixes.NameAttr: []byte(filepath.Base(path)),
prefixes.MTimeAttr: []byte(fi.ModTime().Format(time.RFC3339)),
}
if len(parentID) > 0 {
attributes[prefixes.ParentidAttr] = []byte(parentID)
}
sha1h, md5h, adler32h, err := node.CalculateChecksums(context.Background(), path)
if err == nil {
attributes[prefixes.ChecksumPrefix+"sha1"] = sha1h.Sum(nil)
attributes[prefixes.ChecksumPrefix+"md5"] = md5h.Sum(nil)
attributes[prefixes.ChecksumPrefix+"adler32"] = adler32h.Sum(nil)
}
if fi.IsDir() {
attributes.SetInt64(prefixes.TypeAttr, int64(provider.ResourceType_RESOURCE_TYPE_CONTAINER))
attributes.SetInt64(prefixes.TreesizeAttr, 0)
if previousAttribs != nil && previousAttribs[prefixes.TreesizeAttr] != nil {
attributes[prefixes.TreesizeAttr] = previousAttribs[prefixes.TreesizeAttr]
}
attributes[prefixes.PropagationAttr] = []byte("1")
} else {
attributes.SetInt64(prefixes.TypeAttr, int64(provider.ResourceType_RESOURCE_TYPE_FILE))
attributes.SetString(prefixes.BlobIDAttr, id)
attributes.SetInt64(prefixes.BlobsizeAttr, fi.Size())
// propagate the change
sizeDiff := fi.Size()
if previousAttribs != nil && previousAttribs[prefixes.BlobsizeAttr] != nil {
oldSize, err := attributes.Int64(prefixes.BlobsizeAttr)
if err == nil {
sizeDiff -= oldSize
}
}
n := node.New(spaceID, id, parentID, filepath.Base(path), fi.Size(), "", provider.ResourceType_RESOURCE_TYPE_FILE, nil, t.lookup)
n.SpaceRoot = &node.Node{SpaceID: spaceID, ID: spaceID}
err = t.Propagate(context.Background(), n, sizeDiff)
if err != nil {
return nil, errors.Wrap(err, "failed to propagate")
}
}
err = t.lookup.MetadataBackend().SetMultiple(context.Background(), path, attributes, false)
if err != nil {
return nil, errors.Wrap(err, "failed to set attributes")
}
_ = t.lookup.(*lookup.Lookup).CacheID(context.Background(), spaceID, id, path)
return fi, nil
}
@@ -0,0 +1,85 @@
package tree
import (
"bufio"
"encoding/json"
"io"
"os"
"strconv"
"time"
"github.com/cs3org/reva/v2/pkg/storage/fs/posix/lookup"
)
type GpfsFileAuditLoggingWatcher struct {
tree *Tree
}
type lwe struct {
Event string
Path string
BytesWritten string
}
func NewGpfsFileAuditLoggingWatcher(tree *Tree, auditLogFile string) (*GpfsFileAuditLoggingWatcher, error) {
w := &GpfsFileAuditLoggingWatcher{
tree: tree,
}
_, err := os.Stat(auditLogFile)
if err != nil {
return nil, err
}
return w, nil
}
func (w *GpfsFileAuditLoggingWatcher) Watch(path string) {
start:
file, err := os.Open(path)
if err != nil {
// try again later
time.Sleep(5 * time.Second)
goto start
}
defer file.Close()
// Seek to the end of the file
_, err = file.Seek(0, io.SeekEnd)
if err != nil {
time.Sleep(5 * time.Second)
goto start
}
reader := bufio.NewReader(file)
ev := &lwe{}
for {
line, err := reader.ReadString('\n')
switch err {
case nil:
err := json.Unmarshal([]byte(line), ev)
if err != nil {
continue
}
switch ev.Event {
case "CREATE":
go func() { _ = w.tree.Scan(ev.Path, false) }()
case "CLOSE":
bytesWritten, err := strconv.Atoi(ev.BytesWritten)
if err == nil && bytesWritten > 0 {
go func() { _ = w.tree.Scan(ev.Path, true) }()
}
case "RENAME":
go func() {
_ = w.tree.Scan(ev.Path, true)
_ = w.tree.lookup.(*lookup.Lookup).WarmupIDCache(ev.Path)
}()
}
case io.EOF:
time.Sleep(1 * time.Second)
default:
time.Sleep(5 * time.Second)
goto start
}
}
}
@@ -0,0 +1,67 @@
package tree
import (
"context"
"encoding/json"
"log"
"strconv"
"strings"
"github.com/cs3org/reva/v2/pkg/storage/fs/posix/lookup"
kafka "github.com/segmentio/kafka-go"
)
type GpfsWatchFolderWatcher struct {
tree *Tree
brokers []string
}
func NewGpfsWatchFolderWatcher(tree *Tree, kafkaBrokers []string) (*GpfsWatchFolderWatcher, error) {
return &GpfsWatchFolderWatcher{
tree: tree,
brokers: kafkaBrokers,
}, nil
}
func (w *GpfsWatchFolderWatcher) Watch(topic string) {
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: w.brokers,
GroupID: "ocis-posixfs",
Topic: topic,
})
lwev := &lwe{}
for {
m, err := r.ReadMessage(context.Background())
if err != nil {
break
}
err = json.Unmarshal(m.Value, lwev)
if err != nil {
continue
}
if strings.HasSuffix(lwev.Path, ".flock") || strings.HasSuffix(lwev.Path, ".mlock") {
continue
}
switch {
case strings.Contains(lwev.Event, "IN_CREATE"):
go func() { _ = w.tree.Scan(lwev.Path, false) }()
case strings.Contains(lwev.Event, "IN_CLOSE_WRITE"):
bytesWritten, err := strconv.Atoi(lwev.BytesWritten)
if err == nil && bytesWritten > 0 {
go func() { _ = w.tree.Scan(lwev.Path, true) }()
}
case strings.Contains(lwev.Event, "IN_MOVED_TO"):
go func() {
_ = w.tree.Scan(lwev.Path, true)
_ = w.tree.lookup.(*lookup.Lookup).WarmupIDCache(lwev.Path)
}()
}
}
if err := r.Close(); err != nil {
log.Fatal("failed to close reader:", err)
}
}
@@ -0,0 +1,73 @@
package tree
import (
"fmt"
"strings"
"github.com/cs3org/reva/v2/pkg/storage/fs/posix/lookup"
"github.com/pablodz/inotifywaitgo/inotifywaitgo"
)
type InotifyWatcher struct {
tree *Tree
}
func NewInotifyWatcher(tree *Tree) *InotifyWatcher {
return &InotifyWatcher{
tree: tree,
}
}
func (iw *InotifyWatcher) Watch(path string) {
events := make(chan inotifywaitgo.FileEvent)
errors := make(chan error)
go inotifywaitgo.WatchPath(&inotifywaitgo.Settings{
Dir: path,
FileEvents: events,
ErrorChan: errors,
KillOthers: true,
Options: &inotifywaitgo.Options{
Recursive: true,
Events: []inotifywaitgo.EVENT{
inotifywaitgo.CREATE,
inotifywaitgo.MOVED_TO,
inotifywaitgo.CLOSE_WRITE,
},
Monitor: true,
},
Verbose: false,
})
for {
select {
case event := <-events:
for _, e := range event.Events {
if strings.HasSuffix(event.Filename, ".flock") || strings.HasSuffix(event.Filename, ".mlock") {
continue
}
switch e {
case inotifywaitgo.CREATE:
go func() { _ = iw.tree.Scan(event.Filename, false) }()
case inotifywaitgo.MOVED_TO:
go func() {
_ = iw.tree.Scan(event.Filename, true)
_ = iw.tree.lookup.(*lookup.Lookup).WarmupIDCache(event.Filename)
}()
case inotifywaitgo.CLOSE_WRITE:
go func() { _ = iw.tree.Scan(event.Filename, true) }()
}
}
case err := <-errors:
switch err.Error() {
case inotifywaitgo.NOT_INSTALLED:
panic("Error: inotifywait is not installed")
case inotifywaitgo.INVALID_EVENT:
// ignore
default:
fmt.Printf("Error: %s\n", err)
}
}
}
}
+170 -165
View File
@@ -30,23 +30,28 @@ import (
"strings"
"time"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"github.com/cs3org/reva/v2/pkg/appctx"
"github.com/cs3org/reva/v2/pkg/errtypes"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/lookup"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata/prefixes"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/options"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/tree/propagator"
"github.com/cs3org/reva/v2/pkg/utils"
"github.com/google/uuid"
"github.com/pkg/errors"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"go-micro.dev/v4/store"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
"golang.org/x/sync/errgroup"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"github.com/cs3org/reva/v2/pkg/appctx"
"github.com/cs3org/reva/v2/pkg/errtypes"
"github.com/cs3org/reva/v2/pkg/logger"
"github.com/cs3org/reva/v2/pkg/storage/fs/posix/lookup"
"github.com/cs3org/reva/v2/pkg/storage/fs/posix/options"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata/prefixes"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/tree/propagator"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/usermapper"
"github.com/cs3org/reva/v2/pkg/utils"
)
var tracer trace.Tracer
@@ -62,6 +67,15 @@ type Blobstore interface {
Delete(node *node.Node) error
}
type Watcher interface {
Watch(path string)
}
type scanItem struct {
Path string
ForceRescan bool
}
// Tree manages a hierarchical tree
type Tree struct {
lookup node.PathLookup
@@ -70,26 +84,76 @@ type Tree struct {
options *options.Options
idCache store.Store
userMapper usermapper.Mapper
idCache store.Store
watcher Watcher
scanQueue chan scanItem
scanDebouncer *ScanDebouncer
log *zerolog.Logger
}
// PermissionCheckFunc defined a function used to check resource permissions
type PermissionCheckFunc func(rp *provider.ResourcePermissions) bool
// New returns a new instance of Tree
func New(lu node.PathLookup, bs Blobstore, o *options.Options, cache store.Store) *Tree {
return &Tree{
func New(lu node.PathLookup, bs Blobstore, um usermapper.Mapper, o *options.Options, cache store.Store) (*Tree, error) {
log := logger.New()
scanQueue := make(chan scanItem)
t := &Tree{
lookup: lu,
blobstore: bs,
userMapper: um,
options: o,
idCache: cache,
propagator: propagator.New(lu, o),
propagator: propagator.New(lu, &o.Options),
scanQueue: scanQueue,
scanDebouncer: NewScanDebouncer(500*time.Millisecond, func(item scanItem) {
scanQueue <- item
}),
log: log,
}
watchPath := o.WatchPath
var err error
switch o.WatchType {
case "gpfswatchfolder":
t.watcher, err = NewGpfsWatchFolderWatcher(t, strings.Split(o.WatchFolderKafkaBrokers, ","))
if err != nil {
return nil, err
}
case "gpfsfileauditlogging":
t.watcher, err = NewGpfsFileAuditLoggingWatcher(t, o.WatchPath)
if err != nil {
return nil, err
}
default:
t.watcher = NewInotifyWatcher(t)
watchPath = o.Root
}
// Start watching for fs events and put them into the queue
go t.watcher.Watch(watchPath)
// Handle queued fs events
go t.workScanQueue()
return t, nil
}
// Setup prepares the tree structure
func (t *Tree) Setup() error {
return os.MkdirAll(t.options.Root, 0700)
err := os.MkdirAll(t.options.Root, 0700)
if err != nil {
return err
}
err = os.MkdirAll(t.options.UploadDirectory, 0700)
if err != nil {
return err
}
return nil
}
// GetMD returns the metadata of a node in the tree
@@ -115,35 +179,48 @@ func (t *Tree) TouchFile(ctx context.Context, n *node.Node, markprocessing bool,
return errtypes.AlreadyExists(n.ID)
}
parentPath := n.ParentPath()
nodePath := filepath.Join(parentPath, n.Name)
// lock the meta file
unlock, err := t.lookup.MetadataBackend().Lock(nodePath)
if err != nil {
return err
}
defer func() {
_ = unlock()
}()
if n.ID == "" {
n.ID = uuid.New().String()
}
n.SetType(provider.ResourceType_RESOURCE_TYPE_FILE)
nodePath := n.InternalPath()
// Set id in cache
_ = t.lookup.(*lookup.Lookup).CacheID(context.Background(), n.SpaceID, n.ID, nodePath)
if err := os.MkdirAll(filepath.Dir(nodePath), 0700); err != nil {
return errors.Wrap(err, "Decomposedfs: error creating node")
}
_, err := os.Create(nodePath)
_, err = os.Create(nodePath)
if err != nil {
return errors.Wrap(err, "Decomposedfs: error creating node")
}
attributes := n.NodeMetadata(ctx)
attributes[prefixes.IDAttr] = []byte(n.ID)
if markprocessing {
attributes[prefixes.StatusPrefix] = []byte(node.ProcessingStatus)
}
nodeMTime := time.Now()
if mtime != "" {
if err := n.SetMtimeString(ctx, mtime); err != nil {
return errors.Wrap(err, "Decomposedfs: could not set mtime")
}
} else {
now := time.Now()
if err := n.SetMtime(ctx, &now); err != nil {
return errors.Wrap(err, "Decomposedfs: could not set mtime")
nodeMTime, err = utils.MTimeToTime(mtime)
if err != nil {
return err
}
}
err = n.SetXattrsWithContext(ctx, attributes, true)
attributes[prefixes.MTimeAttr] = []byte(nodeMTime.UTC().Format(time.RFC3339Nano))
err = n.SetXattrsWithContext(ctx, attributes, false)
if err != nil {
return err
}
@@ -192,47 +269,9 @@ func (t *Tree) Move(ctx context.Context, oldNode *node.Node, newNode *node.Node)
}
}
// remove cache entry in any case to avoid inconsistencies
defer func() { _ = t.idCache.Delete(filepath.Join(oldNode.ParentPath(), oldNode.Name)) }()
// Always target the old node ID for xattr updates.
// The new node id is empty if the target does not exist
// and we need to overwrite the new one when overwriting an existing path.
// are we just renaming (parent stays the same)?
if oldNode.ParentID == newNode.ParentID {
// parentPath := t.lookup.InternalPath(oldNode.SpaceID, oldNode.ParentID)
parentPath := oldNode.ParentPath()
// rename child
err = os.Rename(
filepath.Join(parentPath, oldNode.Name),
filepath.Join(parentPath, newNode.Name),
)
if err != nil {
return errors.Wrap(err, "Decomposedfs: could not rename child")
}
// update name attribute
if err := oldNode.SetXattrString(ctx, prefixes.NameAttr, newNode.Name); err != nil {
return errors.Wrap(err, "Decomposedfs: could not set name attribute")
}
return t.Propagate(ctx, newNode, 0)
}
// we are moving the node to a new parent, any target has been removed
// bring old node to the new parent
// rename child
err = os.Rename(
filepath.Join(oldNode.ParentPath(), oldNode.Name),
filepath.Join(newNode.ParentPath(), newNode.Name),
)
if err != nil {
return errors.Wrap(err, "Decomposedfs: could not move child")
}
// update target parentid and name
attribs := node.Attributes{}
attribs.SetString(prefixes.ParentidAttr, newNode.ParentID)
@@ -253,6 +292,28 @@ func (t *Tree) Move(ctx context.Context, oldNode *node.Node, newNode *node.Node)
sizeDiff = oldNode.Blobsize
}
// rename node
err = os.Rename(
filepath.Join(oldNode.ParentPath(), oldNode.Name),
filepath.Join(newNode.ParentPath(), newNode.Name),
)
if err != nil {
return errors.Wrap(err, "Decomposedfs: could not move child")
}
// update the id cache
if newNode.ID == "" {
newNode.ID = oldNode.ID
}
_ = t.lookup.(*lookup.Lookup).CacheID(ctx, newNode.SpaceID, newNode.ID, filepath.Join(newNode.ParentPath(), newNode.Name))
// update id cache for the moved subtree
if oldNode.IsDir(ctx) {
err = t.lookup.(*lookup.Lookup).WarmupIDCache(filepath.Join(newNode.ParentPath(), newNode.Name))
if err != nil {
return err
}
}
// TODO inefficient because we might update several nodes twice, only propagate unchanged nodes?
// collect in a list, then only stat each node once
// also do this in a go routine ... webdav should check the etag async
@@ -268,18 +329,6 @@ func (t *Tree) Move(ctx context.Context, oldNode *node.Node, newNode *node.Node)
return nil
}
func readChildNodeFromLink(ctx context.Context, path string) (string, error) {
_, span := tracer.Start(ctx, "readChildNodeFromLink")
defer span.End()
link, err := os.Readlink(path)
if err != nil {
return "", err
}
nodeID := strings.TrimLeft(link, "/.")
nodeID = strings.ReplaceAll(nodeID, "/", "")
return nodeID, nil
}
// ListFolder lists the content of a folder node
func (t *Tree) ListFolder(ctx context.Context, n *node.Node) ([]*node.Node, error) {
ctx, span := tracer.Start(ctx, "ListFolder")
@@ -329,22 +378,27 @@ func (t *Tree) ListFolder(ctx context.Context, n *node.Node) ([]*node.Node, erro
// Spawn workers that'll concurrently work the queue
for i := 0; i < numWorkers; i++ {
g.Go(func() error {
var err error
// switch user if necessary
spaceGID, ok := ctx.Value(decomposedfs.CtxKeySpaceGID).(uint32)
if ok {
unscope, err := t.userMapper.ScopeUserByIds(-1, int(spaceGID))
if err != nil {
return errors.Wrap(err, "failed to scope user")
}
defer func() { _ = unscope() }()
}
for name := range work {
path := filepath.Join(dir, name)
nodeID := getNodeIDFromCache(ctx, path, t.idCache)
if nodeID == "" {
nodeID, err = readChildNodeFromLink(ctx, path)
if err != nil {
return err
}
err = storeNodeIDInCache(ctx, path, nodeID, t.idCache)
if err != nil {
return err
nodeID, err := t.lookup.MetadataBackend().Get(ctx, path, prefixes.IDAttr)
if err != nil {
if metadata.IsAttrUnset(err) {
continue
}
return err
}
child, err := node.ReadNode(ctx, t.lookup, n.SpaceID, nodeID, false, n.SpaceRoot, true)
child, err := node.ReadNode(ctx, t.lookup, n.SpaceID, string(nodeID), false, n.SpaceRoot, true)
if err != nil {
return err
}
@@ -384,7 +438,12 @@ func (t *Tree) ListFolder(ctx context.Context, n *node.Node) ([]*node.Node, erro
// Delete deletes a node in the tree by moving it to the trash
func (t *Tree) Delete(ctx context.Context, n *node.Node) (err error) {
path := filepath.Join(n.ParentPath(), n.Name)
path := n.InternalPath()
if !strings.HasPrefix(path, t.options.Root) {
return errtypes.InternalError("invalid internal path")
}
// remove entry from cache immediately to avoid inconsistencies
defer func() { _ = t.idCache.Delete(path) }()
@@ -392,19 +451,7 @@ func (t *Tree) Delete(ctx context.Context, n *node.Node) (err error) {
if deletingSharedResource != nil && deletingSharedResource.(bool) {
src := filepath.Join(n.ParentPath(), n.Name)
return os.Remove(src)
}
// get the original path
origin, err := t.lookup.Path(ctx, n, node.NoCheck)
if err != nil {
return
}
// set origin location in metadata
nodePath := n.InternalPath()
if err := n.SetXattrString(ctx, prefixes.TrashOriginAttr, origin); err != nil {
return err
return os.RemoveAll(src)
}
var sizeDiff int64
@@ -418,53 +465,11 @@ func (t *Tree) Delete(ctx context.Context, n *node.Node) (err error) {
sizeDiff = -n.Blobsize
}
deletionTime := time.Now().UTC().Format(time.RFC3339Nano)
// Prepare the trash
trashLink := filepath.Join(t.options.Root, "spaces", lookup.Pathify(n.SpaceRoot.ID, 1, 2), "trash", lookup.Pathify(n.ID, 4, 2))
if err := os.MkdirAll(filepath.Dir(trashLink), 0700); err != nil {
// Roll back changes
_ = n.RemoveXattr(ctx, prefixes.TrashOriginAttr, true)
return err
}
// FIXME can we just move the node into the trash dir? instead of adding another symlink and appending a trash timestamp?
// can we just use the mtime as the trash time?
// TODO store a trashed by userid
// first make node appear in the space trash
// parent id and name are stored as extended attributes in the node itself
err = os.Symlink("../../../../../nodes/"+lookup.Pathify(n.ID, 4, 2)+node.TrashIDDelimiter+deletionTime, trashLink)
if err != nil {
// Roll back changes
_ = n.RemoveXattr(ctx, prefixes.TrashOriginAttr, true)
return
}
// at this point we have a symlink pointing to a non existing destination, which is fine
// rename the trashed node so it is not picked up when traversing up the tree and matches the symlink
trashPath := nodePath + node.TrashIDDelimiter + deletionTime
err = os.Rename(nodePath, trashPath)
if err != nil {
// To roll back changes
// TODO remove symlink
// Roll back changes
_ = n.RemoveXattr(ctx, prefixes.TrashOriginAttr, true)
return
}
err = t.lookup.MetadataBackend().Rename(nodePath, trashPath)
if err != nil {
_ = n.RemoveXattr(ctx, prefixes.TrashOriginAttr, true)
_ = os.Rename(trashPath, nodePath)
return
}
// Remove lock file if it exists
_ = os.Remove(n.LockFilePath())
// finally remove the entry from the parent dir
if err = os.Remove(path); err != nil {
if err = os.RemoveAll(path); err != nil {
// To roll back changes
// TODO revert the rename
// TODO remove symlink
@@ -729,18 +734,37 @@ func (t *Tree) InitNewNode(ctx context.Context, n *node.Node, fsize uint64) (met
func (t *Tree) createDirNode(ctx context.Context, n *node.Node) (err error) {
ctx, span := tracer.Start(ctx, "createDirNode")
defer span.End()
idcache := t.lookup.(*lookup.Lookup).IDCache
// create a directory node
nodePath := n.InternalPath()
if err := os.MkdirAll(nodePath, 0700); err != nil {
parentPath, ok := idcache.Get(ctx, n.SpaceID, n.ParentID)
if !ok {
return errtypes.NotFound(n.ParentID)
}
path := filepath.Join(parentPath, n.Name)
// lock the meta file
unlock, err := t.lookup.MetadataBackend().Lock(path)
if err != nil {
return err
}
defer func() {
_ = unlock()
}()
if err := os.MkdirAll(path, 0700); err != nil {
return errors.Wrap(err, "Decomposedfs: error creating node")
}
_ = idcache.Set(ctx, n.SpaceID, n.ID, path)
attributes := n.NodeMetadata(ctx)
attributes[prefixes.IDAttr] = []byte(n.ID)
attributes[prefixes.TreesizeAttr] = []byte("0") // initialize as empty, TODO why bother? if it is not set we could treat it as 0?
if t.options.TreeTimeAccounting || t.options.TreeSizeAccounting {
attributes[prefixes.PropagationAttr] = []byte("1") // mark the node for propagation
}
return n.SetXattrsWithContext(ctx, attributes, true)
return n.SetXattrsWithContext(ctx, attributes, false)
}
var nodeIDRegep = regexp.MustCompile(`.*/nodes/([^.]*).*`)
@@ -814,22 +838,3 @@ func (t *Tree) readRecycleItem(ctx context.Context, spaceID, key, path string) (
return
}
func getNodeIDFromCache(ctx context.Context, path string, cache store.Store) string {
_, span := tracer.Start(ctx, "getNodeIDFromCache")
defer span.End()
recs, err := cache.Read(path)
if err == nil && len(recs) > 0 {
return string(recs[0].Value)
}
return ""
}
func storeNodeIDInCache(ctx context.Context, path string, nodeID string, cache store.Store) error {
_, span := tracer.Start(ctx, "storeNodeIDInCache")
defer span.End()
return cache.Write(&store.Record{
Key: path,
Value: []byte(nodeID),
})
}
+11
View File
@@ -23,6 +23,8 @@ import (
"io"
"net/url"
tusd "github.com/tus/tusd/pkg/handler"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
registry "github.com/cs3org/go-cs3apis/cs3/storage/registry/v1beta1"
)
@@ -71,6 +73,15 @@ type FS interface {
DeleteStorageSpace(ctx context.Context, req *provider.DeleteStorageSpaceRequest) error
}
// UnscopeFunc is a function that unscopes a user
type UnscopeFunc func()
// Composable is the interface that a struct needs to implement
// to be composable, so that it can support the TUS methods
type ComposableFS interface {
UseIn(composer *tusd.StoreComposer)
}
// Registry is the interface that storage registries implement
// for discovering storage providers
type Registry interface {
@@ -22,6 +22,7 @@ import (
"github.com/cs3org/reva/v2/pkg/events"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/permissions"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/usermapper"
)
// Aspects holds dependencies for handling aspects of the decomposedfs
@@ -31,4 +32,5 @@ type Aspects struct {
Permissions permissions.Permissions
EventStream events.Stream
DisableVersioning bool
UserMapper usermapper.Mapper
}
@@ -51,6 +51,7 @@ import (
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/spaceidindex"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/tree"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/upload"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/usermapper"
"github.com/cs3org/reva/v2/pkg/storage/utils/filelocks"
"github.com/cs3org/reva/v2/pkg/storage/utils/templates"
"github.com/cs3org/reva/v2/pkg/storagespace"
@@ -65,6 +66,12 @@ import (
"golang.org/x/sync/errgroup"
)
type CtxKey int
const (
CtxKeySpaceGID CtxKey = iota
)
var (
tracer trace.Tracer
@@ -105,6 +112,7 @@ type Decomposedfs struct {
tp node.Tree
o *options.Options
p permissions.Permissions
um usermapper.Mapper
chunkHandler *chunking.ChunkHandler
stream events.Stream
sessionStore SessionStore
@@ -200,19 +208,25 @@ func New(o *options.Options, aspects aspects.Aspects) (storage.FS, error) {
return nil, err
}
// set a null usermapper if we don't have one
if aspects.UserMapper == nil {
aspects.UserMapper = &usermapper.NullMapper{}
}
fs := &Decomposedfs{
tp: aspects.Tree,
lu: aspects.Lookup,
o: o,
p: aspects.Permissions,
um: aspects.UserMapper,
chunkHandler: chunking.NewChunkHandler(filepath.Join(o.Root, "uploads")),
stream: aspects.EventStream,
UserCache: ttlcache.NewCache(),
userSpaceIndex: userSpaceIndex,
groupSpaceIndex: groupSpaceIndex,
spaceTypeIndex: spaceTypeIndex,
sessionStore: upload.NewSessionStore(aspects.Lookup, aspects.Tree, o.Root, aspects.EventStream, o.AsyncFileUploads, o.Tokens, aspects.DisableVersioning),
}
fs.sessionStore = upload.NewSessionStore(fs, aspects, o.Root, o.AsyncFileUploads, o.Tokens)
if o.AsyncFileUploads {
if fs.stream == nil {
@@ -884,7 +898,7 @@ func (fs *Decomposedfs) GetMD(ctx context.Context, ref *provider.Reference, mdKe
}
}
if addSpace {
if md.Space, err = fs.storageSpaceFromNode(ctx, node, true); err != nil {
if md.Space, err = fs.StorageSpaceFromNode(ctx, node, true); err != nil {
return nil, err
}
}
+13 -10
View File
@@ -20,7 +20,6 @@ package decomposedfs
import (
"context"
"os"
"path/filepath"
"strings"
@@ -29,11 +28,11 @@ import (
ctxpkg "github.com/cs3org/reva/v2/pkg/ctx"
"github.com/cs3org/reva/v2/pkg/errtypes"
"github.com/cs3org/reva/v2/pkg/storage/utils/ace"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata/prefixes"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node"
"github.com/cs3org/reva/v2/pkg/storagespace"
"github.com/cs3org/reva/v2/pkg/utils"
"github.com/rogpeppe/go-internal/lockedfile"
)
// DenyGrant denies access to a resource.
@@ -80,7 +79,9 @@ func (fs *Decomposedfs) AddGrant(ctx context.Context, ref *provider.Reference, g
if err != nil {
return err
}
defer unlockFunc()
defer func() {
_ = unlockFunc()
}()
if grant != nil {
return errtypes.AlreadyExists(filepath.Join(grantNode.ParentID, grantNode.Name))
@@ -176,7 +177,9 @@ func (fs *Decomposedfs) RemoveGrant(ctx context.Context, ref *provider.Reference
if err != nil {
return err
}
defer unlockFunc()
defer func() {
_ = unlockFunc()
}()
if grant == nil {
return errtypes.NotFound("grant not found")
@@ -237,7 +240,9 @@ func (fs *Decomposedfs) UpdateGrant(ctx context.Context, ref *provider.Reference
if err != nil {
return err
}
defer unlockFunc()
defer func() {
_ = unlockFunc()
}()
if grant == nil {
// grant not found
@@ -264,9 +269,7 @@ func (fs *Decomposedfs) UpdateGrant(ctx context.Context, ref *provider.Reference
}
// checks if the given grant exists and returns it. Nil grant means it doesn't exist
func (fs *Decomposedfs) loadGrant(ctx context.Context, ref *provider.Reference, g *provider.Grant) (*node.Node, func(), *provider.Grant, error) {
var unlockFunc func()
func (fs *Decomposedfs) loadGrant(ctx context.Context, ref *provider.Reference, g *provider.Grant) (*node.Node, metadata.UnlockFunc, *provider.Grant, error) {
n, err := fs.lu.NodeFromResource(ctx, ref)
if err != nil {
return nil, nil, nil, err
@@ -275,11 +278,11 @@ func (fs *Decomposedfs) loadGrant(ctx context.Context, ref *provider.Reference,
return nil, nil, nil, errtypes.NotFound(filepath.Join(n.ParentID, n.Name))
}
f, err := lockedfile.OpenFile(fs.lu.MetadataBackend().LockfilePath(n.InternalPath()), os.O_RDWR|os.O_CREATE, 0600)
// lock the metadata file
unlockFunc, err := fs.lu.MetadataBackend().Lock(n.InternalPath())
if err != nil {
return nil, nil, nil, err
}
unlockFunc = func() { f.Close() }
grants, err := n.ListGrants(ctx)
if err != nil {
@@ -23,6 +23,7 @@ import (
"os"
"syscall"
"github.com/cs3org/reva/v2/pkg/errtypes"
"github.com/pkg/errors"
"github.com/pkg/xattr"
)
@@ -30,6 +31,9 @@ import (
// IsNotExist checks if there is a os not exists error buried inside the xattr error,
// as we cannot just use os.IsNotExist().
func IsNotExist(err error) bool {
if _, ok := err.(errtypes.IsNotFound); ok {
return true
}
if os.IsNotExist(errors.Cause(err)) {
return true
}
@@ -59,5 +63,10 @@ func IsNotDir(err error) bool {
return serr == syscall.ENOTDIR
}
}
if xerr, ok := errors.Cause(err).(*xattr.Error); ok {
if serr, ok2 := xerr.Err.(syscall.Errno); ok2 {
return serr == syscall.ENOTDIR
}
}
return false
}
+34 -18
View File
@@ -76,6 +76,24 @@ func (fs *Decomposedfs) CreateStorageSpace(ctx context.Context, req *provider.Cr
spaceID = reqSpaceID
}
// Check if space already exists
rootPath := ""
switch req.Type {
case _spaceTypePersonal:
if fs.o.PersonalSpacePathTemplate != "" {
rootPath = filepath.Join(fs.o.Root, templates.WithUser(u, fs.o.PersonalSpacePathTemplate))
}
default:
if fs.o.GeneralSpacePathTemplate != "" {
rootPath = filepath.Join(fs.o.Root, templates.WithSpacePropertiesAndUser(u, req.Type, req.Name, spaceID, fs.o.GeneralSpacePathTemplate))
}
}
if rootPath != "" {
if _, err := os.Stat(rootPath); err == nil {
return nil, errtypes.AlreadyExists("decomposedfs: spaces: space already exists")
}
}
description := utils.ReadPlainFromOpaque(req.Opaque, "description")
alias := utils.ReadPlainFromOpaque(req.Opaque, "spaceAlias")
if alias == "" {
@@ -97,20 +115,18 @@ func (fs *Decomposedfs) CreateStorageSpace(ctx context.Context, req *provider.Cr
// create a directory node
root.SetType(provider.ResourceType_RESOURCE_TYPE_CONTAINER)
rootPath := root.InternalPath()
switch req.Type {
case _spaceTypePersonal:
if fs.o.PersonalSpacePathTemplate != "" {
rootPath = filepath.Join(fs.o.Root, templates.WithUser(u, fs.o.PersonalSpacePathTemplate))
}
default:
if fs.o.GeneralSpacePathTemplate != "" {
rootPath = filepath.Join(fs.o.Root, templates.WithSpacePropertiesAndUser(u, req.Type, req.Name, spaceID, fs.o.GeneralSpacePathTemplate))
}
if rootPath == "" {
rootPath = root.InternalPath()
}
if err := os.MkdirAll(rootPath, 0700); err != nil {
return nil, errors.Wrap(err, "Decomposedfs: error creating node")
// set 755 permissions for the base dir
if err := os.MkdirAll(filepath.Dir(rootPath), 0755); err != nil {
return nil, errors.Wrap(err, fmt.Sprintf("Decomposedfs: error creating spaces base dir %s", filepath.Dir(rootPath)))
}
// 770 permissions for the space
if err := os.MkdirAll(rootPath, 0770); err != nil {
return nil, errors.Wrap(err, fmt.Sprintf("Decomposedfs: error creating space %s", rootPath))
}
// Store id in cache
@@ -199,7 +215,7 @@ func (fs *Decomposedfs) CreateStorageSpace(ctx context.Context, req *provider.Cr
}
}
space, err := fs.storageSpaceFromNode(ctx, root, true)
space, err := fs.StorageSpaceFromNode(ctx, root, true)
if err != nil {
return nil, err
}
@@ -289,7 +305,7 @@ func (fs *Decomposedfs) ListStorageSpaces(ctx context.Context, filter []*provide
// return empty list
return spaces, nil
}
space, err := fs.storageSpaceFromNode(ctx, n, checkNodePermissions)
space, err := fs.StorageSpaceFromNode(ctx, n, checkNodePermissions)
if err != nil {
return nil, err
}
@@ -432,7 +448,7 @@ func (fs *Decomposedfs) ListStorageSpaces(ctx context.Context, filter []*provide
continue
}
space, err := fs.storageSpaceFromNode(ctx, n, checkNodePermissions)
space, err := fs.StorageSpaceFromNode(ctx, n, checkNodePermissions)
if err != nil {
switch err.(type) {
case errtypes.IsPermissionDenied:
@@ -485,7 +501,7 @@ func (fs *Decomposedfs) ListStorageSpaces(ctx context.Context, filter []*provide
return nil, err
}
if n.Exists {
space, err := fs.storageSpaceFromNode(ctx, n, checkNodePermissions)
space, err := fs.StorageSpaceFromNode(ctx, n, checkNodePermissions)
if err != nil {
return nil, err
}
@@ -670,7 +686,7 @@ func (fs *Decomposedfs) UpdateStorageSpace(ctx context.Context, req *provider.Up
}
// send back the updated data from the storage
updatedSpace, err := fs.storageSpaceFromNode(ctx, spaceNode, false)
updatedSpace, err := fs.StorageSpaceFromNode(ctx, spaceNode, false)
if err != nil {
return nil, err
}
@@ -779,7 +795,7 @@ func (fs *Decomposedfs) linkStorageSpaceType(ctx context.Context, spaceType, spa
return fs.spaceTypeIndex.Add(spaceType, spaceID, target)
}
func (fs *Decomposedfs) storageSpaceFromNode(ctx context.Context, n *node.Node, checkPermissions bool) (*provider.StorageSpace, error) {
func (fs *Decomposedfs) StorageSpaceFromNode(ctx context.Context, n *node.Node, checkPermissions bool) (*provider.StorageSpace, error) {
user := ctxpkg.ContextMustGetUser(ctx)
if checkPermissions {
rp, err := fs.p.AssemblePermissions(ctx, n)
@@ -601,9 +601,13 @@ func (t *Tree) RestoreRecycleItemFunc(ctx context.Context, spaceid, key, trashPa
return errors.Wrap(err, "Decomposedfs: could not resolve trash root")
}
deletePath = filepath.Join(resolvedTrashRoot, trashPath)
}
if err = os.Remove(deletePath); err != nil {
logger.Error().Err(err).Str("trashItem", trashItem).Msg("error deleting trash item")
if err = os.Remove(deletePath); err != nil {
logger.Error().Err(err).Str("trashItem", trashItem).Str("deletePath", deletePath).Str("trashPath", trashPath).Msg("error deleting trash item")
}
} else {
if err = utils.RemoveItem(deletePath); err != nil {
logger.Error().Err(err).Str("trashItem", trashItem).Str("deletePath", deletePath).Str("trashPath", trashPath).Msg("error recursively deleting trash item")
}
}
var sizeDiff int64
@@ -651,7 +655,7 @@ func (t *Tree) PurgeRecycleItemFunc(ctx context.Context, spaceid, key string, pa
}
deletePath = filepath.Join(resolvedTrashRoot, path)
}
if err = os.Remove(deletePath); err != nil {
if err = utils.RemoveItem(deletePath); err != nil {
logger.Error().Err(err).Str("deletePath", deletePath).Msg("error deleting trash item")
return err
}
+26 -10
View File
@@ -24,6 +24,7 @@ import (
"os"
"path/filepath"
"strings"
"syscall"
"time"
"github.com/google/uuid"
@@ -180,6 +181,13 @@ func (fs *Decomposedfs) InitiateUpload(ctx context.Context, ref *provider.Refere
session.SetStorageValue("SpaceRoot", n.SpaceRoot.ID) // TODO SpaceRoot -> SpaceID
session.SetStorageValue("SpaceOwnerOrManager", n.SpaceOwnerOrManager(ctx).GetOpaqueId()) // TODO needed for what?
// remember the gid of the space
fi, err := os.Stat(n.SpaceRoot.InternalPath())
if err != nil {
return nil, err
}
session.SetStorageValue("SpaceGid", fmt.Sprintf("%d", (fi.Sys().(*syscall.Stat_t).Gid)))
iid, _ := ctxpkg.ContextGetInitiator(ctx)
session.SetMetadata("initiatorid", iid)
@@ -298,18 +306,20 @@ func (fs *Decomposedfs) InitiateUpload(ctx context.Context, ref *provider.Refere
session.SetStorageValue("LogLevel", log.GetLevel().String())
log.Debug().Interface("session", session).Msg("Decomposedfs: built session info")
// Create binary file in the upload folder with no content
// It will be used when determining the current offset of an upload
err = session.TouchBin()
err = fs.um.RunInBaseScope(func() error {
// Create binary file in the upload folder with no content
// It will be used when determining the current offset of an upload
err := session.TouchBin()
if err != nil {
return err
}
return session.Persist(ctx)
})
if err != nil {
return nil, err
}
err = session.Persist(ctx)
if err != nil {
return nil, err
}
metrics.UploadSessionsInitiated.Inc()
if uploadLength == 0 {
@@ -345,7 +355,13 @@ func (fs *Decomposedfs) NewUpload(ctx context.Context, info tusd.FileInfo) (tusd
// GetUpload returns the Upload for the given upload id
func (fs *Decomposedfs) GetUpload(ctx context.Context, id string) (tusd.Upload, error) {
return fs.sessionStore.Get(ctx, id)
var ul tusd.Upload
var err error
_ = fs.um.RunInBaseScope(func() error {
ul, err = fs.sessionStore.Get(ctx, id)
return nil
})
return ul, err
}
// ListUploadSessions returns the upload sessions for the given filter
@@ -34,10 +34,13 @@ import (
"github.com/cs3org/reva/v2/pkg/appctx"
"github.com/cs3org/reva/v2/pkg/errtypes"
"github.com/cs3org/reva/v2/pkg/events"
"github.com/cs3org/reva/v2/pkg/storage"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/aspects"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata/prefixes"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/options"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/usermapper"
"github.com/google/uuid"
"github.com/pkg/errors"
"github.com/rogpeppe/go-internal/lockedfile"
@@ -53,8 +56,10 @@ type PermissionsChecker interface {
// OcisStore manages upload sessions
type OcisStore struct {
fs storage.FS
lu node.PathLookup
tp node.Tree
um usermapper.Mapper
root string
pub events.Publisher
async bool
@@ -63,15 +68,17 @@ type OcisStore struct {
}
// NewSessionStore returns a new OcisStore
func NewSessionStore(lu node.PathLookup, tp node.Tree, root string, pub events.Publisher, async bool, tknopts options.TokenOptions, disableVersioning bool) *OcisStore {
func NewSessionStore(fs storage.FS, aspects aspects.Aspects, root string, async bool, tknopts options.TokenOptions) *OcisStore {
return &OcisStore{
lu: lu,
tp: tp,
fs: fs,
lu: aspects.Lookup,
tp: aspects.Tree,
root: root,
pub: pub,
pub: aspects.EventStream,
async: async,
tknopts: tknopts,
disableVersioning: disableVersioning,
disableVersioning: aspects.DisableVersioning,
um: aspects.UserMapper,
}
}
@@ -230,7 +237,7 @@ func (store OcisStore) CreateNodeForUpload(session *OcisSession, initAttrs node.
unlock, err = store.tp.InitNewNode(ctx, n, uint64(session.Size()))
if err != nil {
appctx.GetLogger(ctx).Error().Err(err).Msg("failed to init new node")
appctx.GetLogger(ctx).Error().Str("path", n.InternalPath()).Err(err).Msg("failed to init new node")
}
session.info.MetaData["sizeDiff"] = strconv.FormatInt(session.Size(), 10)
}
@@ -270,7 +277,10 @@ func (store OcisStore) CreateNodeForUpload(session *OcisSession, initAttrs node.
return nil, errors.Wrap(err, "Decomposedfs: could not write metadata")
}
if err := session.Persist(ctx); err != nil {
err = store.um.RunInBaseScope(func() error {
return session.Persist(ctx)
})
if err != nil {
return nil, err
}
@@ -340,9 +350,8 @@ func (store OcisStore) updateExistingNode(ctx context.Context, session *OcisSess
}
}
versionPath := n.InternalPath()
if !store.disableVersioning {
versionPath = session.store.lu.InternalPath(spaceID, n.ID+node.RevisionIDDelimiter+oldNodeMtime.UTC().Format(time.RFC3339Nano))
versionPath := session.store.lu.InternalPath(spaceID, n.ID+node.RevisionIDDelimiter+oldNodeMtime.UTC().Format(time.RFC3339Nano))
// create version node
if _, err := os.Create(versionPath); err != nil {
@@ -359,14 +368,14 @@ func (store OcisStore) updateExistingNode(ctx context.Context, session *OcisSess
}, f, true); err != nil {
return unlock, err
}
session.info.MetaData["versionsPath"] = versionPath
// keep mtime from previous version
if err := os.Chtimes(session.info.MetaData["versionsPath"], oldNodeMtime, oldNodeMtime); err != nil {
return unlock, errtypes.InternalError(fmt.Sprintf("failed to change mtime of version node: %s", err))
}
}
session.info.MetaData["sizeDiff"] = strconv.FormatInt((int64(fsize) - old.Blobsize), 10)
session.info.MetaData["versionsPath"] = versionPath
// keep mtime from previous version
if err := os.Chtimes(session.info.MetaData["versionsPath"], oldNodeMtime, oldNodeMtime); err != nil {
return unlock, errtypes.InternalError(fmt.Sprintf("failed to change mtime of version node: %s", err))
}
session.info.MetaData["sizeDiff"] = strconv.FormatInt((int64(fsize) - old.Blobsize), 10)
return unlock, nil
}
@@ -26,6 +26,7 @@ import (
"io"
"io/fs"
"os"
"strconv"
"strings"
"time"
@@ -143,6 +144,22 @@ func (session *OcisSession) FinishUpload(ctx context.Context) error {
prefixes.ChecksumPrefix + "adler32": adler32h.Sum(nil),
}
// At this point we scope by the space to create the final file in the final location
if session.store.um != nil && session.info.Storage["SpaceGid"] != "" {
gid, err := strconv.Atoi(session.info.Storage["SpaceGid"])
if err != nil {
return errors.Wrap(err, "failed to parse space gid")
}
unscope, err := session.store.um.ScopeUserByIds(-1, gid)
if err != nil {
return errors.Wrap(err, "failed to scope user")
}
if unscope != nil {
defer func() { _ = unscope() }()
}
}
n, err := session.store.CreateNodeForUpload(session, attrs)
if err != nil {
session.store.Cleanup(ctx, session, true, false, false)
@@ -198,7 +215,9 @@ func (session *OcisSession) Terminate(_ context.Context) error {
func (session *OcisSession) DeclareLength(ctx context.Context, length int64) error {
session.info.Size = length
session.info.SizeIsDeferred = false
return session.Persist(session.Context(ctx))
return session.store.um.RunInBaseScope(func() error {
return session.Persist(session.Context(ctx))
})
}
// ConcatUploads concatenates multiple uploads
@@ -231,7 +250,8 @@ func (session *OcisSession) Finalize() (err error) {
ctx, span := tracer.Start(session.Context(context.Background()), "Finalize")
defer span.End()
revisionNode := &node.Node{SpaceID: session.SpaceID(), BlobID: session.ID(), Blobsize: session.Size()}
revisionNode := node.New(session.SpaceID(), session.NodeID(), "", "", session.Size(), session.ID(),
provider.ResourceType_RESOURCE_TYPE_FILE, session.SpaceOwner(), session.store.lu)
// upload the data to the blobstore
_, subspan := tracer.Start(ctx, "WriteBlob")
@@ -270,9 +290,9 @@ func (session *OcisSession) Cleanup(revertNodeMetadata, cleanBin, cleanInfo bool
if revertNodeMetadata {
n, err := session.Node(ctx)
if err != nil {
appctx.GetLogger(ctx).Error().Err(err).Str("spaceid", n.SpaceID).Str("nodeid", n.ID).Str("uploadid", session.ID()).Msg("reading node for session failed")
appctx.GetLogger(ctx).Error().Err(err).Str("sessionid", session.ID()).Msg("reading node for session failed")
}
if session.NodeExists() {
if session.NodeExists() && session.info.MetaData["versionsPath"] != "" {
p := session.info.MetaData["versionsPath"]
if err := session.store.lu.CopyMetadata(ctx, p, n.InternalPath(), func(attributeName string, value []byte) (newValue []byte, copy bool) {
return value, strings.HasPrefix(attributeName, prefixes.ChecksumPrefix) ||
@@ -0,0 +1,56 @@
// Copyright 2018-2021 CERN
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// In applying this license, CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.
package usermapper
import (
"context"
)
// Mapper is the interface that wraps the basic mapping methods
type Mapper interface {
RunInBaseScope(f func() error) error
ScopeBase() (func() error, error)
ScopeUser(ctx context.Context) (func() error, error)
ScopeUserByIds(uid, gid int) (func() error, error)
}
// UnscopeFunc is a function that unscopes the current user
type UnscopeFunc func() error
// NullMapper is a user mapper that does nothing
type NullMapper struct{}
// RunInBaseScope runs the given function in the scope of the base user
func (nm *NullMapper) RunInBaseScope(f func() error) error {
return f()
}
// ScopeBase returns to the base uid and gid returning a function that can be used to restore the previous scope
func (nm *NullMapper) ScopeBase() (func() error, error) {
return func() error { return nil }, nil
}
// ScopeUser returns to the base uid and gid returning a function that can be used to restore the previous scope
func (nm *NullMapper) ScopeUser(ctx context.Context) (func() error, error) {
return func() error { return nil }, nil
}
func (nm *NullMapper) ScopeUserByIds(uid, gid int) (func() error, error) {
return func() error { return nil }, nil
}
@@ -0,0 +1,131 @@
// Copyright 2018-2021 CERN
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// In applying this license, CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.
package usermapper
import (
"context"
"fmt"
"os/user"
"runtime"
"strconv"
"golang.org/x/sys/unix"
revactx "github.com/cs3org/reva/v2/pkg/ctx"
)
// UnixMapper is a user mapper that maps users to unix uids and gids
type UnixMapper struct {
baseUid int
baseGid int
}
// New returns a new user mapper
func NewUnixMapper() *UnixMapper {
baseUid, _ := unix.SetfsuidRetUid(-1)
baseGid, _ := unix.SetfsgidRetGid(-1)
return &UnixMapper{
baseUid: baseUid,
baseGid: baseGid,
}
}
// RunInUserScope runs the given function in the scope of the base user
func (um *UnixMapper) RunInBaseScope(f func() error) error {
unscope, err := um.ScopeBase()
if err != nil {
return err
}
defer func() { _ = unscope() }()
return f()
}
// ScopeBase returns to the base uid and gid returning a function that can be used to restore the previous scope
func (um *UnixMapper) ScopeBase() (func() error, error) {
return um.ScopeUserByIds(-1, um.baseGid)
}
// ScopeUser returns to the base uid and gid returning a function that can be used to restore the previous scope
func (um *UnixMapper) ScopeUser(ctx context.Context) (func() error, error) {
u := revactx.ContextMustGetUser(ctx)
uid, gid, err := um.mapUser(u.Username)
if err != nil {
return nil, err
}
return um.ScopeUserByIds(uid, gid)
}
// ScopeUserByIds scopes the current user to the given uid and gid returning a function that can be used to restore the previous scope
func (um *UnixMapper) ScopeUserByIds(uid, gid int) (func() error, error) {
runtime.LockOSThread() // Lock this Goroutine to the current OS thread
var err error
var prevUid int
var prevGid int
if uid >= 0 {
prevUid, err = unix.SetfsuidRetUid(uid)
if err != nil {
return nil, err
}
if testUid, _ := unix.SetfsuidRetUid(-1); testUid != uid {
return nil, fmt.Errorf("failed to setfsuid to %d", uid)
}
}
if gid >= 0 {
prevGid, err = unix.SetfsgidRetGid(gid)
if err != nil {
return nil, err
}
if testGid, _ := unix.SetfsgidRetGid(-1); testGid != gid {
return nil, fmt.Errorf("failed to setfsgid to %d", gid)
}
}
return func() error {
if uid >= 0 {
_ = unix.Setfsuid(prevUid)
}
if gid >= 0 {
_ = unix.Setfsgid(prevGid)
}
runtime.UnlockOSThread()
return nil
}, nil
}
func (u *UnixMapper) mapUser(username string) (int, int, error) {
userDetails, err := user.Lookup(username)
if err != nil {
return 0, 0, err
}
uid, err := strconv.Atoi(userDetails.Uid)
if err != nil {
return 0, 0, err
}
gid, err := strconv.Atoi(userDetails.Gid)
if err != nil {
return 0, 0, err
}
return uid, gid, nil
}
File diff suppressed because it is too large Load Diff