Merge pull request #8340 from dragonchaser/filename-incrementor

[Full-ci] Bump reva
This commit is contained in:
Christian Richter
2024-02-05 12:54:45 +01:00
committed by GitHub
31 changed files with 1787 additions and 121 deletions

View File

@@ -0,0 +1,6 @@
Enhancement: bump reva
We have bumped reve to pull in the changes needed for automatically increment filenames on upload collisions in secret filedrops.
https://github.com/owncloud/ocis/pull/8340
https://github.com/owncloud/ocis/issues/8291

2
go.mod
View File

@@ -12,7 +12,7 @@ require (
github.com/blevesearch/bleve/v2 v2.3.10
github.com/coreos/go-oidc/v3 v3.9.0
github.com/cs3org/go-cs3apis v0.0.0-20231023073225-7748710e0781
github.com/cs3org/reva/v2 v2.18.1-0.20240129131717-cff0a2eeb959
github.com/cs3org/reva/v2 v2.18.1-0.20240205065033-2c21ada2ae52
github.com/dhowden/tag v0.0.0-20230630033851-978a0926ee25
github.com/disintegration/imaging v1.6.2
github.com/dutchcoders/go-clamd v0.0.0-20170520113014-b970184f4d9e

4
go.sum
View File

@@ -1019,8 +1019,8 @@ github.com/crewjam/saml v0.4.14 h1:g9FBNx62osKusnFzs3QTN5L9CVA/Egfgm+stJShzw/c=
github.com/crewjam/saml v0.4.14/go.mod h1:UVSZCf18jJkk6GpWNVqcyQJMD5HsRugBPf4I1nl2mME=
github.com/cs3org/go-cs3apis v0.0.0-20231023073225-7748710e0781 h1:BUdwkIlf8IS2FasrrPg8gGPHQPOrQ18MS1Oew2tmGtY=
github.com/cs3org/go-cs3apis v0.0.0-20231023073225-7748710e0781/go.mod h1:UXha4TguuB52H14EMoSsCqDj7k8a/t7g4gVP+bgY5LY=
github.com/cs3org/reva/v2 v2.18.1-0.20240129131717-cff0a2eeb959 h1:8uiYRWlbrhQJk4pHawpJUTFx/Yy0G5yvMYam2TMKDYo=
github.com/cs3org/reva/v2 v2.18.1-0.20240129131717-cff0a2eeb959/go.mod h1:GCN3g6uYE0Nvd31dGlhaGGyUviUfbG2NkecPRv5oSc4=
github.com/cs3org/reva/v2 v2.18.1-0.20240205065033-2c21ada2ae52 h1:Jeh8q6WKl4gcK7GMayn56y1uxbw91XvyuZcvY7SiDRk=
github.com/cs3org/reva/v2 v2.18.1-0.20240205065033-2c21ada2ae52/go.mod h1:GCN3g6uYE0Nvd31dGlhaGGyUviUfbG2NkecPRv5oSc4=
github.com/cyberdelia/templates v0.0.0-20141128023046-ca7fffd4298c/go.mod h1:GyV+0YP4qX0UQ7r2MoYZ+AvYDp12OF5yg4q8rGnyNh4=
github.com/cyphar/filepath-securejoin v0.2.4 h1:Ugdm7cg7i6ZK6x3xDF1oEu1nfkyfH53EtKeQYTC3kyg=
github.com/cyphar/filepath-securejoin v0.2.4/go.mod h1:aPGpWjXOXUn2NCNjFvBE6aRxGGx79pTxQpKOJNYHHl4=

View File

@@ -118,7 +118,6 @@ cannot share a folder with create permission
#### [Upload-only shares must not overwrite but create a separate file](https://github.com/owncloud/ocis/issues/1267)
- [coreApiSharePublicLink2/uploadToPublicLinkShare.feature:13](https://github.com/owncloud/ocis/blob/master/tests/acceptance/features/coreApiSharePublicLink2/uploadToPublicLinkShare.feature#L13)
- [coreApiSharePublicLink2/uploadToPublicLinkShare.feature:121](https://github.com/owncloud/ocis/blob/master/tests/acceptance/features/coreApiSharePublicLink2/uploadToPublicLinkShare.feature#L121)
#### [d:quota-available-bytes in dprop of PROPFIND give wrong response value](https://github.com/owncloud/ocis/issues/8197)

View File

@@ -0,0 +1,20 @@
package ocdav
import (
"context"
cs3storage "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
)
type tokenStatInfoKey struct{}
// ContextWithTokenStatInfo adds the token stat info to the context
func ContextWithTokenStatInfo(ctx context.Context, info *cs3storage.ResourceInfo) context.Context {
return context.WithValue(ctx, tokenStatInfoKey{}, info)
}
// TokenStatInfoFromContext returns the token stat info from the context
func TokenStatInfoFromContext(ctx context.Context) (*cs3storage.ResourceInfo, bool) {
v, ok := ctx.Value(tokenStatInfoKey{}).(*cs3storage.ResourceInfo)
return v, ok
}

View File

@@ -44,8 +44,6 @@ const (
_trashbinPath = "trash-bin"
)
type tokenStatInfoKey struct{}
// DavHandler routes to the different sub handlers
type DavHandler struct {
AvatarsHandler *AvatarsHandler
@@ -318,9 +316,9 @@ func (h *DavHandler) Handler(s *svc) http.Handler {
}
log.Debug().Interface("statInfo", sRes.Info).Msg("Stat info from public link token path")
ctx := ContextWithTokenStatInfo(ctx, sRes.Info)
r = r.WithContext(ctx)
if sRes.Info.Type != provider.ResourceType_RESOURCE_TYPE_CONTAINER {
ctx := context.WithValue(ctx, tokenStatInfoKey{}, sRes.Info)
r = r.WithContext(ctx)
h.PublicFileHandler.Handler(s).ServeHTTP(w, r)
} else {
h.PublicFolderHandler.Handler(s).ServeHTTP(w, r)

View File

@@ -159,6 +159,8 @@ var (
ErrNoSuchLock = errors.New("webdav: no such lock")
// ErrNotImplemented is returned when hitting not implemented code paths
ErrNotImplemented = errors.New("webdav: not implemented")
// ErrTokenNotFound is returned when a token is not found
ErrTokenStatInfoMissing = errors.New("webdav: token stat info missing")
)
// HandleErrorStatus checks the status code, logs a Debug or Error level message

View File

@@ -0,0 +1,47 @@
package ocdav
import (
"context"
"errors"
"path/filepath"
"strconv"
"strings"
gatewayv1beta1 "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1"
rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
)
// FindName returns the next filename available when the current
func FindName(ctx context.Context, client gatewayv1beta1.GatewayAPIClient, name string, parentid *provider.ResourceId) (string, *rpc.Status, error) {
lReq := &provider.ListContainerRequest{
Ref: &provider.Reference{
ResourceId: parentid,
},
}
lRes, err := client.ListContainer(ctx, lReq)
if err != nil {
return "", nil, err
}
if lRes.Status.Code != rpc.Code_CODE_OK {
return "", lRes.Status, nil
}
// iterate over the listing to determine next suffix
var itemMap = make(map[string]struct{})
for _, fi := range lRes.Infos {
itemMap[fi.GetName()] = struct{}{}
}
ext := filepath.Ext(name)
fileName := strings.TrimSuffix(name, ext)
if strings.HasSuffix(fileName, ".tar") {
fileName = strings.TrimSuffix(fileName, ".tar")
ext = filepath.Ext(fileName) + "." + ext
}
// starts with two because "normal" humans begin counting with 1 and we say the existing file is the first one
for i := 2; i < len(itemMap)+3; i++ {
if _, ok := itemMap[fileName+" ("+strconv.Itoa(i)+")"+ext]; !ok {
return fileName + " (" + strconv.Itoa(i) + ")" + ext, lRes.GetStatus(), nil
}
}
return "", nil, errors.New("could not determine new filename")
}

View File

@@ -26,7 +26,7 @@ import (
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
typesv1beta1 "github.com/cs3org/go-cs3apis/cs3/types/v1beta1"
"github.com/cs3org/reva/v2/internal/http/services/owncloud/ocdav/errors"
ocdaverrors "github.com/cs3org/reva/v2/internal/http/services/owncloud/ocdav/errors"
"github.com/cs3org/reva/v2/internal/http/services/owncloud/ocdav/net"
"github.com/cs3org/reva/v2/internal/http/services/owncloud/ocdav/propfind"
"github.com/cs3org/reva/v2/pkg/appctx"
@@ -96,7 +96,16 @@ func (s *svc) handlePropfindOnToken(w http.ResponseWriter, r *http.Request, ns s
ctx, span := appctx.GetTracerProvider(r.Context()).Tracer(tracerName).Start(r.Context(), "token_propfind")
defer span.End()
tokenStatInfo := ctx.Value(tokenStatInfoKey{}).(*provider.ResourceInfo)
tokenStatInfo, ok := TokenStatInfoFromContext(ctx)
if !ok {
span.RecordError(ocdaverrors.ErrTokenStatInfoMissing)
span.SetStatus(codes.Error, ocdaverrors.ErrTokenStatInfoMissing.Error())
span.SetAttributes(semconv.HTTPStatusCodeKey.Int(http.StatusInternalServerError))
w.WriteHeader(http.StatusInternalServerError)
b, err := ocdaverrors.Marshal(http.StatusInternalServerError, ocdaverrors.ErrTokenStatInfoMissing.Error(), "")
ocdaverrors.HandleWebdavError(appctx.GetLogger(ctx), w, b, err)
return
}
sublog := appctx.GetLogger(ctx).With().Interface("tokenStatInfo", tokenStatInfo).Logger()
sublog.Debug().Msg("handlePropfindOnToken")
@@ -109,20 +118,20 @@ func (s *svc) handlePropfindOnToken(w http.ResponseWriter, r *http.Request, ns s
sublog.Debug().Str("depth", dh).Msg(err.Error())
w.WriteHeader(http.StatusBadRequest)
m := fmt.Sprintf("Invalid Depth header value: %v", dh)
b, err := errors.Marshal(http.StatusBadRequest, m, "")
errors.HandleWebdavError(&sublog, w, b, err)
b, err := ocdaverrors.Marshal(http.StatusBadRequest, m, "")
ocdaverrors.HandleWebdavError(&sublog, w, b, err)
return
}
if depth == net.DepthInfinity && !s.c.AllowPropfindDepthInfinitiy {
span.RecordError(errors.ErrInvalidDepth)
span.RecordError(ocdaverrors.ErrInvalidDepth)
span.SetStatus(codes.Error, "DEPTH: infinity is not supported")
span.SetAttributes(semconv.HTTPStatusCodeKey.Int(http.StatusBadRequest))
sublog.Debug().Str("depth", dh).Msg(errors.ErrInvalidDepth.Error())
sublog.Debug().Str("depth", dh).Msg(ocdaverrors.ErrInvalidDepth.Error())
w.WriteHeader(http.StatusBadRequest)
m := fmt.Sprintf("Invalid Depth header value: %v", dh)
b, err := errors.Marshal(http.StatusBadRequest, m, "")
errors.HandleWebdavError(&sublog, w, b, err)
b, err := ocdaverrors.Marshal(http.StatusBadRequest, m, "")
ocdaverrors.HandleWebdavError(&sublog, w, b, err)
return
}

View File

@@ -155,6 +155,47 @@ func (s *svc) handlePut(ctx context.Context, w http.ResponseWriter, r *http.Requ
w.WriteHeader(http.StatusInternalServerError)
return
}
// Test if the target is a secret filedrop
tokenStatInfo, ok := TokenStatInfoFromContext(ctx)
// We assume that when the uploader can create containers, but is not allowed to list them, it is a secret file drop
if ok && tokenStatInfo.GetPermissionSet().CreateContainer && !tokenStatInfo.GetPermissionSet().ListContainer {
// TODO we can skip this stat if the tokenStatInfo is the direct parent
sReq := &provider.StatRequest{
Ref: ref,
}
sRes, err := client.Stat(ctx, sReq)
if err != nil {
log.Error().Err(err).Msg("error sending grpc stat request")
w.WriteHeader(http.StatusInternalServerError)
return
}
// We also need to continue if we are not allowed to stat a resource. We may not have stat permission. That still means it exists and we need to find a new filename.
switch sRes.Status.Code {
case rpc.Code_CODE_OK, rpc.Code_CODE_PERMISSION_DENIED:
// find next filename
newName, status, err := FindName(ctx, client, filepath.Base(ref.Path), sRes.GetInfo().GetParentId())
if err != nil {
log.Error().Err(err).Msg("error sending grpc stat request")
w.WriteHeader(http.StatusInternalServerError)
return
}
if status.Code != rpc.Code_CODE_OK {
log.Error().Interface("status", status).Msg("error listing file")
errors.HandleErrorStatus(&log, w, status)
return
}
ref.Path = utils.MakeRelativePath(filepath.Join(filepath.Dir(ref.GetPath()), newName))
case rpc.Code_CODE_NOT_FOUND:
// just continue with normal upload
default:
log.Error().Interface("status", sRes.Status).Msg("error stating file")
errors.HandleErrorStatus(&log, w, sRes.Status)
return
}
}
opaque := &typespb.Opaque{}
if mtime := r.Header.Get(net.HeaderOCMtime); mtime != "" {
utils.AppendPlainToOpaque(opaque, net.HeaderOCMtime, mtime)

View File

@@ -24,6 +24,7 @@ import (
"io"
"net/http"
"path"
"path/filepath"
"strconv"
"strings"
"time"
@@ -115,6 +116,15 @@ func (s *svc) handleTusPost(ctx context.Context, w http.ResponseWriter, r *http.
w.WriteHeader(http.StatusPreconditionFailed)
return
}
// Test if the target is a secret filedrop
var isSecretFileDrop bool
tokenStatInfo, ok := TokenStatInfoFromContext(ctx)
// We assume that when the uploader can create containers, but is not allowed to list them, it is a secret file drop
if ok && tokenStatInfo.GetPermissionSet().CreateContainer && !tokenStatInfo.GetPermissionSet().ListContainer {
isSecretFileDrop = true
}
// r.Header.Get(net.HeaderOCChecksum)
// TODO must be SHA1, ADLER32 or MD5 ... in capital letters????
// curl -X PUT https://demo.owncloud.com/remote.php/webdav/testcs.bin -u demo:demo -d '123' -v -H 'OC-Checksum: SHA1:40bd001563085fc35165329ea1ff5c5ecbdbbeef'
@@ -158,6 +168,22 @@ func (s *svc) handleTusPost(ctx context.Context, w http.ResponseWriter, r *http.
return
}
}
if isSecretFileDrop {
// find next filename
newName, status, err := FindName(ctx, client, filepath.Base(ref.Path), sRes.GetInfo().GetParentId())
if err != nil {
log.Error().Err(err).Msg("error sending grpc stat request")
w.WriteHeader(http.StatusInternalServerError)
return
}
if status.GetCode() != rpc.Code_CODE_OK {
log.Error().Interface("status", status).Msg("error listing file")
errors.HandleErrorStatus(&log, w, status)
return
}
ref.Path = filepath.Join(filepath.Dir(ref.GetPath()), newName)
sRes.GetInfo().Name = newName
}
}
uploadLength, err := strconv.ParseInt(r.Header.Get(net.HeaderUploadLength), 10, 64)

View File

@@ -72,10 +72,6 @@ func NewFile(c map[string]interface{}) (publicshare.Manager, error) {
}
p := file.New(conf.File)
if err := p.Init(context.Background()); err != nil {
return nil, err
}
return New(conf.GatewayAddr, conf.SharePasswordHashCost, conf.JanitorRunInterval, conf.EnableExpiredSharesCleanup, p)
}
@@ -89,10 +85,6 @@ func NewMemory(c map[string]interface{}) (publicshare.Manager, error) {
conf.init()
p := memory.New()
if err := p.Init(context.Background()); err != nil {
return nil, err
}
return New(conf.GatewayAddr, conf.SharePasswordHashCost, conf.JanitorRunInterval, conf.EnableExpiredSharesCleanup, p)
}
@@ -111,10 +103,6 @@ func NewCS3(c map[string]interface{}) (publicshare.Manager, error) {
}
p := cs3.New(s)
if err := p.Init(context.Background()); err != nil {
return nil, err
}
return New(conf.GatewayAddr, conf.SharePasswordHashCost, conf.JanitorRunInterval, conf.EnableExpiredSharesCleanup, p)
}
@@ -174,6 +162,10 @@ type manager struct {
enableExpiredSharesCleanup bool
}
func (m *manager) init() error {
return m.persistence.Init(context.Background())
}
func (m *manager) startJanitorRun() {
if !m.enableExpiredSharesCleanup {
return
@@ -200,6 +192,10 @@ func (m *manager) Dump(ctx context.Context, shareChan chan<- *publicshare.WithPa
m.mutex.Lock()
defer m.mutex.Unlock()
if err := m.init(); err != nil {
return err
}
db, err := m.persistence.Read(ctx)
if err != nil {
return err
@@ -222,6 +218,10 @@ func (m *manager) Load(ctx context.Context, shareChan <-chan *publicshare.WithPa
m.mutex.Lock()
defer m.mutex.Unlock()
if err := m.init(); err != nil {
return err
}
db, err := m.persistence.Read(ctx)
if err != nil {
return err
@@ -296,6 +296,10 @@ func (m *manager) CreatePublicShare(ctx context.Context, u *user.User, rInfo *pr
m.mutex.Lock()
defer m.mutex.Unlock()
if err := m.init(); err != nil {
return nil, err
}
encShare, err := utils.MarshalProtoV1ToJSON(&ps.PublicShare)
if err != nil {
return nil, err
@@ -385,6 +389,10 @@ func (m *manager) UpdatePublicShare(ctx context.Context, u *user.User, req *link
m.mutex.Lock()
defer m.mutex.Unlock()
if err := m.init(); err != nil {
return nil, err
}
db, err := m.persistence.Read(ctx)
if err != nil {
return nil, err
@@ -420,6 +428,10 @@ func (m *manager) GetPublicShare(ctx context.Context, u *user.User, ref *link.Pu
m.mutex.Lock()
defer m.mutex.Unlock()
if err := m.init(); err != nil {
return nil, err
}
if ref.GetToken() != "" {
ps, pw, err := m.getByToken(ctx, ref.GetToken())
if err != nil {
@@ -473,6 +485,10 @@ func (m *manager) ListPublicShares(ctx context.Context, u *user.User, filters []
m.mutex.Lock()
defer m.mutex.Unlock()
if err := m.init(); err != nil {
return nil, err
}
log := appctx.GetLogger(ctx)
db, err := m.persistence.Read(ctx)
@@ -555,6 +571,10 @@ func (m *manager) cleanupExpiredShares() {
m.mutex.Lock()
defer m.mutex.Unlock()
if err := m.init(); err != nil {
return
}
db, _ := m.persistence.Read(context.Background())
for _, v := range db {
@@ -594,6 +614,11 @@ func (m *manager) revokeExpiredPublicShare(ctx context.Context, s *link.PublicSh
func (m *manager) RevokePublicShare(ctx context.Context, _ *user.User, ref *link.PublicShareReference) error {
m.mutex.Lock()
defer m.mutex.Unlock()
if err := m.init(); err != nil {
return err
}
return m.revokePublicShare(ctx, ref)
}
@@ -651,6 +676,10 @@ func (m *manager) GetPublicShareByToken(ctx context.Context, token string, auth
m.mutex.Lock()
defer m.mutex.Unlock()
if err := m.init(); err != nil {
return nil, err
}
db, err := m.persistence.Read(ctx)
if err != nil {
return nil, err

View File

@@ -24,6 +24,7 @@ import (
"fmt"
"os"
"path/filepath"
"sync"
"github.com/cs3org/reva/v2/pkg/publicshare/manager/json/persistence"
)
@@ -31,16 +32,28 @@ import (
type file struct {
path string
initialized bool
lock *sync.RWMutex
}
// New returns a new Cache instance
func New(path string) persistence.Persistence {
return &file{
path: path,
lock: &sync.RWMutex{},
}
}
func (p *file) Init(_ context.Context) error {
if p.isInitialized() {
return nil
}
p.lock.Lock()
defer p.lock.Unlock()
if p.initialized {
return nil
}
// attempt to create the db file
var fi os.FileInfo
var err error
@@ -65,7 +78,7 @@ func (p *file) Init(_ context.Context) error {
}
func (p *file) Read(_ context.Context) (persistence.PublicShares, error) {
if !p.initialized {
if !p.isInitialized() {
return nil, fmt.Errorf("not initialized")
}
db := map[string]interface{}{}
@@ -80,7 +93,7 @@ func (p *file) Read(_ context.Context) (persistence.PublicShares, error) {
}
func (p *file) Write(_ context.Context, db persistence.PublicShares) error {
if !p.initialized {
if !p.isInitialized() {
return fmt.Errorf("not initialized")
}
dbAsJSON, err := json.Marshal(db)
@@ -90,3 +103,9 @@ func (p *file) Write(_ context.Context, db persistence.PublicShares) error {
return os.WriteFile(p.path, dbAsJSON, 0644)
}
func (p *file) isInitialized() bool {
p.lock.RLock()
defer p.lock.RUnlock()
return p.initialized
}

View File

@@ -31,6 +31,7 @@ import (
_ "github.com/cs3org/reva/v2/pkg/storage/fs/nextcloud"
_ "github.com/cs3org/reva/v2/pkg/storage/fs/ocis"
_ "github.com/cs3org/reva/v2/pkg/storage/fs/owncloudsql"
_ "github.com/cs3org/reva/v2/pkg/storage/fs/posix"
_ "github.com/cs3org/reva/v2/pkg/storage/fs/s3"
_ "github.com/cs3org/reva/v2/pkg/storage/fs/s3ng"
// Add your own here

View File

@@ -0,0 +1,122 @@
// Copyright 2018-2021 CERN
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// In applying this license, CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.
package blobstore
import (
"bufio"
"fmt"
"io"
"os"
"path/filepath"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/lookup"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node"
"github.com/cs3org/reva/v2/pkg/utils"
"github.com/pkg/errors"
)
// Blobstore provides an interface to an filesystem based blobstore
type Blobstore struct {
root string
}
// New returns a new Blobstore
func New(root string) (*Blobstore, error) {
err := os.MkdirAll(root, 0700)
if err != nil {
return nil, err
}
return &Blobstore{
root: root,
}, nil
}
// Upload stores some data in the blobstore under the given key
func (bs *Blobstore) Upload(node *node.Node, source string) error {
dest, err := bs.path(node)
if err != nil {
return err
}
// ensure parent path exists
if err := os.MkdirAll(filepath.Dir(dest), 0700); err != nil {
return errors.Wrap(err, "Decomposedfs: oCIS blobstore: error creating parent folders for blob")
}
if err := os.Rename(source, dest); err == nil {
return nil
}
// Rename failed, file needs to be copied.
file, err := os.Open(source)
if err != nil {
return errors.Wrap(err, "Decomposedfs: oCIS blobstore: Can not open source file to upload")
}
defer file.Close()
f, err := os.OpenFile(dest, os.O_CREATE|os.O_WRONLY, 0700)
if err != nil {
return errors.Wrapf(err, "could not open blob '%s' for writing", dest)
}
w := bufio.NewWriter(f)
_, err = w.ReadFrom(file)
if err != nil {
return errors.Wrapf(err, "could not write blob '%s'", dest)
}
return w.Flush()
}
// Download retrieves a blob from the blobstore for reading
func (bs *Blobstore) Download(node *node.Node) (io.ReadCloser, error) {
dest, err := bs.path(node)
if err != nil {
return nil, err
}
file, err := os.Open(dest)
if err != nil {
return nil, errors.Wrapf(err, "could not read blob '%s'", dest)
}
return file, nil
}
// Delete deletes a blob from the blobstore
func (bs *Blobstore) Delete(node *node.Node) error {
dest, err := bs.path(node)
if err != nil {
return err
}
if err := utils.RemoveItem(dest); err != nil {
return errors.Wrapf(err, "could not delete blob '%s'", dest)
}
return nil
}
func (bs *Blobstore) path(node *node.Node) (string, error) {
if node.BlobID == "" {
return "", fmt.Errorf("blobstore: BlobID is empty")
}
return filepath.Join(
bs.root,
filepath.Clean(filepath.Join(
"/", "spaces", lookup.Pathify(node.SpaceID, 1, 2), "blobs", lookup.Pathify(node.BlobID, 4, 2)),
),
), nil
}

View File

@@ -0,0 +1,375 @@
// Copyright 2018-2021 CERN
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// In applying this license, CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.
package lookup
import (
"context"
"fmt"
"os"
"path/filepath"
"strings"
user "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"github.com/cs3org/reva/v2/pkg/appctx"
"github.com/cs3org/reva/v2/pkg/errtypes"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata/prefixes"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/options"
"github.com/cs3org/reva/v2/pkg/storage/utils/templates"
"github.com/pkg/errors"
"github.com/rogpeppe/go-internal/lockedfile"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
)
var tracer trace.Tracer
var _spaceTypePersonal = "personal"
func init() {
tracer = otel.Tracer("github.com/cs3org/reva/pkg/storage/utils/decomposedfs/lookup")
}
// Lookup implements transformations from filepath to node and back
type Lookup struct {
Options *options.Options
metadataBackend metadata.Backend
}
// New returns a new Lookup instance
func New(b metadata.Backend, o *options.Options) *Lookup {
return &Lookup{
Options: o,
metadataBackend: b,
}
}
// MetadataBackend returns the metadata backend
func (lu *Lookup) MetadataBackend() metadata.Backend {
return lu.metadataBackend
}
// ReadBlobSizeAttr reads the blobsize from the xattrs
func (lu *Lookup) ReadBlobSizeAttr(ctx context.Context, path string) (int64, error) {
blobSize, err := lu.metadataBackend.GetInt64(ctx, path, prefixes.BlobsizeAttr)
if err != nil {
return 0, errors.Wrapf(err, "error reading blobsize xattr")
}
return blobSize, nil
}
// ReadBlobIDAttr reads the blobsize from the xattrs
func (lu *Lookup) ReadBlobIDAttr(ctx context.Context, path string) (string, error) {
attr, err := lu.metadataBackend.Get(ctx, path, prefixes.BlobIDAttr)
if err != nil {
return "", errors.Wrapf(err, "error reading blobid xattr")
}
return string(attr), nil
}
// TypeFromPath returns the type of the node at the given path
func (lu *Lookup) TypeFromPath(ctx context.Context, path string) provider.ResourceType {
// Try to read from xattrs
typeAttr, err := lu.metadataBackend.GetInt64(ctx, path, prefixes.TypeAttr)
if err == nil {
return provider.ResourceType(int32(typeAttr))
}
t := provider.ResourceType_RESOURCE_TYPE_INVALID
// Fall back to checking on disk
fi, err := os.Lstat(path)
if err != nil {
return t
}
switch {
case fi.IsDir():
if _, err = lu.metadataBackend.Get(ctx, path, prefixes.ReferenceAttr); err == nil {
t = provider.ResourceType_RESOURCE_TYPE_REFERENCE
} else {
t = provider.ResourceType_RESOURCE_TYPE_CONTAINER
}
case fi.Mode().IsRegular():
t = provider.ResourceType_RESOURCE_TYPE_FILE
case fi.Mode()&os.ModeSymlink != 0:
t = provider.ResourceType_RESOURCE_TYPE_SYMLINK
// TODO reference using ext attr on a symlink
// nodeType = provider.ResourceType_RESOURCE_TYPE_REFERENCE
}
return t
}
// NodeFromResource takes in a request path or request id and converts it to a Node
func (lu *Lookup) NodeFromResource(ctx context.Context, ref *provider.Reference) (*node.Node, error) {
ctx, span := tracer.Start(ctx, "NodeFromResource")
defer span.End()
if ref.ResourceId != nil {
// check if a storage space reference is used
// currently, the decomposed fs uses the root node id as the space id
n, err := lu.NodeFromID(ctx, ref.ResourceId)
if err != nil {
return nil, err
}
// is this a relative reference?
if ref.Path != "" {
p := filepath.Clean(ref.Path)
if p != "." && p != "/" {
// walk the relative path
n, err = lu.WalkPath(ctx, n, p, false, func(ctx context.Context, n *node.Node) error { return nil })
if err != nil {
return nil, err
}
n.SpaceID = ref.ResourceId.SpaceId
}
}
return n, nil
}
// reference is invalid
return nil, fmt.Errorf("invalid reference %+v. resource_id must be set", ref)
}
// NodeFromID returns the internal path for the id
func (lu *Lookup) NodeFromID(ctx context.Context, id *provider.ResourceId) (n *node.Node, err error) {
ctx, span := tracer.Start(ctx, "NodeFromID")
defer span.End()
if id == nil {
return nil, fmt.Errorf("invalid resource id %+v", id)
}
if id.OpaqueId == "" {
// The Resource references the root of a space
return lu.NodeFromSpaceID(ctx, id.SpaceId)
}
return node.ReadNode(ctx, lu, id.SpaceId, id.OpaqueId, false, nil, false)
}
// Pathify segments the beginning of a string into depth segments of width length
// Pathify("aabbccdd", 3, 1) will return "a/a/b/bccdd"
func Pathify(id string, depth, width int) string {
b := strings.Builder{}
i := 0
for ; i < depth; i++ {
if len(id) <= i*width+width {
break
}
b.WriteString(id[i*width : i*width+width])
b.WriteRune(filepath.Separator)
}
b.WriteString(id[i*width:])
return b.String()
}
// NodeFromSpaceID converts a resource id into a Node
func (lu *Lookup) NodeFromSpaceID(ctx context.Context, spaceID string) (n *node.Node, err error) {
node, err := node.ReadNode(ctx, lu, spaceID, spaceID, false, nil, false)
if err != nil {
return nil, err
}
node.SpaceRoot = node
return node, nil
}
// Path returns the path for node
func (lu *Lookup) Path(ctx context.Context, n *node.Node, hasPermission node.PermissionFunc) (p string, err error) {
root := n.SpaceRoot
for n.ID != root.ID {
p = filepath.Join(n.Name, p)
if n, err = n.Parent(ctx); err != nil {
appctx.GetLogger(ctx).
Error().Err(err).
Str("path", p).
Interface("node", n).
Msg("Path()")
return
}
if !hasPermission(n) {
break
}
}
p = filepath.Join("/", p)
return
}
// WalkPath calls n.Child(segment) on every path segment in p starting at the node r.
// If a function f is given it will be executed for every segment node, but not the root node r.
// If followReferences is given the current visited reference node is replaced by the referenced node.
func (lu *Lookup) WalkPath(ctx context.Context, r *node.Node, p string, followReferences bool, f func(ctx context.Context, n *node.Node) error) (*node.Node, error) {
segments := strings.Split(strings.Trim(p, "/"), "/")
var err error
for i := range segments {
if r, err = r.Child(ctx, segments[i]); err != nil {
return r, err
}
if followReferences {
if attrBytes, err := r.Xattr(ctx, prefixes.ReferenceAttr); err == nil {
realNodeID := attrBytes
ref, err := refFromCS3(realNodeID)
if err != nil {
return nil, err
}
r, err = lu.NodeFromID(ctx, ref.ResourceId)
if err != nil {
return nil, err
}
}
}
if r.IsSpaceRoot(ctx) {
r.SpaceRoot = r
}
if !r.Exists && i < len(segments)-1 {
return r, errtypes.NotFound(segments[i])
}
if f != nil {
if err = f(ctx, r); err != nil {
return r, err
}
}
}
return r, nil
}
// InternalRoot returns the internal storage root directory
func (lu *Lookup) InternalRoot() string {
return lu.Options.Root
}
// InternalPath returns the internal path for a given ID
func (lu *Lookup) InternalPath(spaceID, nodeID string) string {
return filepath.Join(lu.Options.Root, "spaces", Pathify(spaceID, 1, 2), "nodes", Pathify(nodeID, 4, 2))
}
func (lu *Lookup) SpacePath(spaceID string) string {
return filepath.Join(lu.Options.Root, spaceID)
}
// // ReferenceFromAttr returns a CS3 reference from xattr of a node.
// // Supported formats are: "cs3:storageid/nodeid"
// func ReferenceFromAttr(b []byte) (*provider.Reference, error) {
// return refFromCS3(b)
// }
// refFromCS3 creates a CS3 reference from a set of bytes. This method should remain private
// and only be called after validation because it can potentially panic.
func refFromCS3(b []byte) (*provider.Reference, error) {
parts := string(b[4:])
return &provider.Reference{
ResourceId: &provider.ResourceId{
StorageId: strings.Split(parts, "/")[0],
OpaqueId: strings.Split(parts, "/")[1],
},
}, nil
}
// CopyMetadata copies all extended attributes from source to target.
// The optional filter function can be used to filter by attribute name, e.g. by checking a prefix
// For the source file, a shared lock is acquired.
// NOTE: target resource will be write locked!
func (lu *Lookup) CopyMetadata(ctx context.Context, src, target string, filter func(attributeName string, value []byte) (newValue []byte, copy bool), acquireTargetLock bool) (err error) {
// Acquire a read log on the source node
// write lock existing node before reading treesize or tree time
lock, err := lockedfile.OpenFile(lu.MetadataBackend().LockfilePath(src), os.O_RDONLY|os.O_CREATE, 0600)
if err != nil {
return err
}
if err != nil {
return errors.Wrap(err, "xattrs: Unable to lock source to read")
}
defer func() {
rerr := lock.Close()
// if err is non nil we do not overwrite that
if err == nil {
err = rerr
}
}()
return lu.CopyMetadataWithSourceLock(ctx, src, target, filter, lock, acquireTargetLock)
}
// CopyMetadataWithSourceLock copies all extended attributes from source to target.
// The optional filter function can be used to filter by attribute name, e.g. by checking a prefix
// For the source file, a matching lockedfile is required.
// NOTE: target resource will be write locked!
func (lu *Lookup) CopyMetadataWithSourceLock(ctx context.Context, sourcePath, targetPath string, filter func(attributeName string, value []byte) (newValue []byte, copy bool), lockedSource *lockedfile.File, acquireTargetLock bool) (err error) {
switch {
case lockedSource == nil:
return errors.New("no lock provided")
case lockedSource.File.Name() != lu.MetadataBackend().LockfilePath(sourcePath):
return errors.New("lockpath does not match filepath")
}
attrs, err := lu.metadataBackend.All(ctx, sourcePath)
if err != nil {
return err
}
newAttrs := make(map[string][]byte, 0)
for attrName, val := range attrs {
if filter != nil {
var ok bool
if val, ok = filter(attrName, val); !ok {
continue
}
}
newAttrs[attrName] = val
}
return lu.MetadataBackend().SetMultiple(ctx, targetPath, newAttrs, acquireTargetLock)
}
// GenerateSpaceID generates a space id for the given space type and owner
func (lu *Lookup) GenerateSpaceID(spaceType string, owner *user.User) (string, error) {
switch spaceType {
case _spaceTypePersonal:
return templates.WithUser(owner, lu.Options.UserLayout), nil
default:
return "", fmt.Errorf("unsupported space type: %s", spaceType)
}
}
// DetectBackendOnDisk returns the name of the metadata backend being used on disk
func DetectBackendOnDisk(root string) string {
matches, _ := filepath.Glob(filepath.Join(root, "spaces", "*", "*"))
if len(matches) > 0 {
base := matches[len(matches)-1]
spaceid := strings.ReplaceAll(
strings.TrimPrefix(base, filepath.Join(root, "spaces")),
"/", "")
spaceRoot := Pathify(spaceid, 4, 2)
_, err := os.Stat(filepath.Join(base, "nodes", spaceRoot+".mpk"))
if err == nil {
return "mpk"
}
_, err = os.Stat(filepath.Join(base, "nodes", spaceRoot+".ini"))
if err == nil {
return "ini"
}
}
return "xattrs"
}

View File

@@ -0,0 +1,100 @@
// Copyright 2018-2021 CERN
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// In applying this license, CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.
package posix
import (
"fmt"
"path"
microstore "go-micro.dev/v4/store"
"github.com/cs3org/reva/v2/pkg/events"
"github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool"
"github.com/cs3org/reva/v2/pkg/storage"
"github.com/cs3org/reva/v2/pkg/storage/fs/posix/blobstore"
"github.com/cs3org/reva/v2/pkg/storage/fs/posix/lookup"
"github.com/cs3org/reva/v2/pkg/storage/fs/posix/tree"
"github.com/cs3org/reva/v2/pkg/storage/fs/registry"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/aspects"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/options"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/permissions"
"github.com/cs3org/reva/v2/pkg/store"
)
func init() {
registry.Register("posix", New)
}
// New returns an implementation to of the storage.FS interface that talk to
// a local filesystem.
func New(m map[string]interface{}, stream events.Stream) (storage.FS, error) {
o, err := options.New(m)
if err != nil {
return nil, err
}
bs, err := blobstore.New(path.Join(o.Root))
if err != nil {
return nil, err
}
var lu *lookup.Lookup
switch o.MetadataBackend {
case "xattrs":
lu = lookup.New(metadata.XattrsBackend{}, o)
case "messagepack":
lu = lookup.New(metadata.NewMessagePackBackend(o.Root, o.FileMetadataCache), o)
default:
return nil, fmt.Errorf("unknown metadata backend %s, only 'messagepack' or 'xattrs' (default) supported", o.MetadataBackend)
}
tp := tree.New(lu, bs, o, store.Create(
store.Store(o.IDCache.Store),
store.TTL(o.IDCache.TTL),
store.Size(o.IDCache.Size),
microstore.Nodes(o.IDCache.Nodes...),
microstore.Database(o.IDCache.Database),
microstore.Table(o.IDCache.Table),
store.DisablePersistence(o.IDCache.DisablePersistence),
store.Authentication(o.IDCache.AuthUsername, o.IDCache.AuthPassword),
))
permissionsSelector, err := pool.PermissionsSelector(o.PermissionsSVC, pool.WithTLSMode(o.PermTLSMode))
if err != nil {
return nil, err
}
p := permissions.NewPermissions(node.NewPermissions(lu), permissionsSelector)
aspects := aspects.Aspects{
Lookup: lu,
Tree: tp,
Permissions: p,
EventStream: stream,
}
fs, err := decomposedfs.New(o, aspects)
if err != nil {
return nil, err
}
return fs, nil
}

View File

@@ -0,0 +1,799 @@
// Copyright 2018-2021 CERN
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// In applying this license, CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.
package tree
import (
"bytes"
"context"
"fmt"
"io"
"io/fs"
"os"
"path/filepath"
"regexp"
"strings"
"time"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"github.com/cs3org/reva/v2/pkg/appctx"
"github.com/cs3org/reva/v2/pkg/errtypes"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/lookup"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata/prefixes"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/options"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/tree/propagator"
"github.com/cs3org/reva/v2/pkg/utils"
"github.com/google/uuid"
"github.com/pkg/errors"
"github.com/rs/zerolog/log"
"go-micro.dev/v4/store"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
"golang.org/x/sync/errgroup"
)
var tracer trace.Tracer
func init() {
tracer = otel.Tracer("github.com/cs3org/reva/pkg/storage/utils/decomposedfs/tree")
}
//go:generate make --no-print-directory -C ../../../../.. mockery NAME=Blobstore
// Blobstore defines an interface for storing blobs in a blobstore
type Blobstore interface {
Upload(node *node.Node, source string) error
Download(node *node.Node) (io.ReadCloser, error)
Delete(node *node.Node) error
}
// Tree manages a hierarchical tree
type Tree struct {
lookup node.PathLookup
blobstore Blobstore
propagator propagator.Propagator
options *options.Options
idCache store.Store
}
// PermissionCheckFunc defined a function used to check resource permissions
type PermissionCheckFunc func(rp *provider.ResourcePermissions) bool
// New returns a new instance of Tree
func New(lu node.PathLookup, bs Blobstore, o *options.Options, cache store.Store) *Tree {
return &Tree{
lookup: lu,
blobstore: bs,
options: o,
idCache: cache,
propagator: propagator.New(lu, o),
}
}
// Setup prepares the tree structure
func (t *Tree) Setup() error {
return os.MkdirAll(t.options.Root, 0700)
}
// GetMD returns the metadata of a node in the tree
func (t *Tree) GetMD(ctx context.Context, n *node.Node) (os.FileInfo, error) {
md, err := os.Stat(n.InternalPath())
if err != nil {
if errors.Is(err, fs.ErrNotExist) {
return nil, errtypes.NotFound(n.ID)
}
return nil, errors.Wrap(err, "tree: error stating "+n.ID)
}
return md, nil
}
// TouchFile creates a new empty file
func (t *Tree) TouchFile(ctx context.Context, n *node.Node, markprocessing bool, mtime string) error {
if n.Exists {
if markprocessing {
return n.SetXattr(ctx, prefixes.StatusPrefix, []byte(node.ProcessingStatus))
}
return errtypes.AlreadyExists(n.ID)
}
if n.ID == "" {
n.ID = uuid.New().String()
}
n.SetType(provider.ResourceType_RESOURCE_TYPE_FILE)
nodePath := n.InternalPath()
if err := os.MkdirAll(filepath.Dir(nodePath), 0700); err != nil {
return errors.Wrap(err, "Decomposedfs: error creating node")
}
_, err := os.Create(nodePath)
if err != nil {
return errors.Wrap(err, "Decomposedfs: error creating node")
}
attributes := n.NodeMetadata(ctx)
if markprocessing {
attributes[prefixes.StatusPrefix] = []byte(node.ProcessingStatus)
}
if mtime != "" {
if err := n.SetMtimeString(ctx, mtime); err != nil {
return errors.Wrap(err, "Decomposedfs: could not set mtime")
}
} else {
now := time.Now()
if err := n.SetMtime(ctx, &now); err != nil {
return errors.Wrap(err, "Decomposedfs: could not set mtime")
}
}
err = n.SetXattrsWithContext(ctx, attributes, true)
if err != nil {
return err
}
return t.Propagate(ctx, n, 0)
}
// CreateDir creates a new directory entry in the tree
func (t *Tree) CreateDir(ctx context.Context, n *node.Node) (err error) {
ctx, span := tracer.Start(ctx, "CreateDir")
defer span.End()
if n.Exists {
return errtypes.AlreadyExists(n.ID) // path?
}
// create a directory node
n.SetType(provider.ResourceType_RESOURCE_TYPE_CONTAINER)
if n.ID == "" {
n.ID = uuid.New().String()
}
err = t.createDirNode(ctx, n)
if err != nil {
return
}
return t.Propagate(ctx, n, 0)
}
// Move replaces the target with the source
func (t *Tree) Move(ctx context.Context, oldNode *node.Node, newNode *node.Node) (err error) {
if oldNode.SpaceID != newNode.SpaceID {
// WebDAV RFC https://www.rfc-editor.org/rfc/rfc4918#section-9.9.4 says to use
// > 502 (Bad Gateway) - This may occur when the destination is on another
// > server and the destination server refuses to accept the resource.
// > This could also occur when the destination is on another sub-section
// > of the same server namespace.
// but we only have a not supported error
return errtypes.NotSupported("cannot move across spaces")
}
// if target exists delete it without trashing it
if newNode.Exists {
// TODO make sure all children are deleted
if err := os.RemoveAll(newNode.InternalPath()); err != nil {
return errors.Wrap(err, "Decomposedfs: Move: error deleting target node "+newNode.ID)
}
}
// remove cache entry in any case to avoid inconsistencies
defer func() { _ = t.idCache.Delete(filepath.Join(oldNode.ParentPath(), oldNode.Name)) }()
// Always target the old node ID for xattr updates.
// The new node id is empty if the target does not exist
// and we need to overwrite the new one when overwriting an existing path.
// are we just renaming (parent stays the same)?
if oldNode.ParentID == newNode.ParentID {
// parentPath := t.lookup.InternalPath(oldNode.SpaceID, oldNode.ParentID)
parentPath := oldNode.ParentPath()
// rename child
err = os.Rename(
filepath.Join(parentPath, oldNode.Name),
filepath.Join(parentPath, newNode.Name),
)
if err != nil {
return errors.Wrap(err, "Decomposedfs: could not rename child")
}
// update name attribute
if err := oldNode.SetXattrString(ctx, prefixes.NameAttr, newNode.Name); err != nil {
return errors.Wrap(err, "Decomposedfs: could not set name attribute")
}
return t.Propagate(ctx, newNode, 0)
}
// we are moving the node to a new parent, any target has been removed
// bring old node to the new parent
// rename child
err = os.Rename(
filepath.Join(oldNode.ParentPath(), oldNode.Name),
filepath.Join(newNode.ParentPath(), newNode.Name),
)
if err != nil {
return errors.Wrap(err, "Decomposedfs: could not move child")
}
// update target parentid and name
attribs := node.Attributes{}
attribs.SetString(prefixes.ParentidAttr, newNode.ParentID)
attribs.SetString(prefixes.NameAttr, newNode.Name)
if err := oldNode.SetXattrsWithContext(ctx, attribs, true); err != nil {
return errors.Wrap(err, "Decomposedfs: could not update old node attributes")
}
// the size diff is the current treesize or blobsize of the old/source node
var sizeDiff int64
if oldNode.IsDir(ctx) {
treeSize, err := oldNode.GetTreeSize(ctx)
if err != nil {
return err
}
sizeDiff = int64(treeSize)
} else {
sizeDiff = oldNode.Blobsize
}
// TODO inefficient because we might update several nodes twice, only propagate unchanged nodes?
// collect in a list, then only stat each node once
// also do this in a go routine ... webdav should check the etag async
err = t.Propagate(ctx, oldNode, -sizeDiff)
if err != nil {
return errors.Wrap(err, "Decomposedfs: Move: could not propagate old node")
}
err = t.Propagate(ctx, newNode, sizeDiff)
if err != nil {
return errors.Wrap(err, "Decomposedfs: Move: could not propagate new node")
}
return nil
}
func readChildNodeFromLink(ctx context.Context, path string) (string, error) {
_, span := tracer.Start(ctx, "readChildNodeFromLink")
defer span.End()
link, err := os.Readlink(path)
if err != nil {
return "", err
}
nodeID := strings.TrimLeft(link, "/.")
nodeID = strings.ReplaceAll(nodeID, "/", "")
return nodeID, nil
}
// ListFolder lists the content of a folder node
func (t *Tree) ListFolder(ctx context.Context, n *node.Node) ([]*node.Node, error) {
ctx, span := tracer.Start(ctx, "ListFolder")
defer span.End()
dir := n.InternalPath()
_, subspan := tracer.Start(ctx, "os.Open")
f, err := os.Open(dir)
subspan.End()
if err != nil {
if errors.Is(err, fs.ErrNotExist) {
return nil, errtypes.NotFound(dir)
}
return nil, errors.Wrap(err, "tree: error listing "+dir)
}
defer f.Close()
_, subspan = tracer.Start(ctx, "f.Readdirnames")
names, err := f.Readdirnames(0)
subspan.End()
if err != nil {
return nil, err
}
numWorkers := t.options.MaxConcurrency
if len(names) < numWorkers {
numWorkers = len(names)
}
work := make(chan string)
results := make(chan *node.Node)
g, ctx := errgroup.WithContext(ctx)
// Distribute work
g.Go(func() error {
defer close(work)
for _, name := range names {
select {
case work <- name:
case <-ctx.Done():
return ctx.Err()
}
}
return nil
})
// Spawn workers that'll concurrently work the queue
for i := 0; i < numWorkers; i++ {
g.Go(func() error {
var err error
for name := range work {
path := filepath.Join(dir, name)
nodeID := getNodeIDFromCache(ctx, path, t.idCache)
if nodeID == "" {
nodeID, err = readChildNodeFromLink(ctx, path)
if err != nil {
return err
}
err = storeNodeIDInCache(ctx, path, nodeID, t.idCache)
if err != nil {
return err
}
}
child, err := node.ReadNode(ctx, t.lookup, n.SpaceID, nodeID, false, n.SpaceRoot, true)
if err != nil {
return err
}
// prevent listing denied resources
if !child.IsDenied(ctx) {
if child.SpaceRoot == nil {
child.SpaceRoot = n.SpaceRoot
}
select {
case results <- child:
case <-ctx.Done():
return ctx.Err()
}
}
}
return nil
})
}
// Wait for things to settle down, then close results chan
go func() {
_ = g.Wait() // error is checked later
close(results)
}()
retNodes := []*node.Node{}
for n := range results {
retNodes = append(retNodes, n)
}
if err := g.Wait(); err != nil {
return nil, err
}
return retNodes, nil
}
// Delete deletes a node in the tree by moving it to the trash
func (t *Tree) Delete(ctx context.Context, n *node.Node) (err error) {
path := filepath.Join(n.ParentPath(), n.Name)
// remove entry from cache immediately to avoid inconsistencies
defer func() { _ = t.idCache.Delete(path) }()
deletingSharedResource := ctx.Value(appctx.DeletingSharedResource)
if deletingSharedResource != nil && deletingSharedResource.(bool) {
src := filepath.Join(n.ParentPath(), n.Name)
return os.Remove(src)
}
// get the original path
origin, err := t.lookup.Path(ctx, n, node.NoCheck)
if err != nil {
return
}
// set origin location in metadata
nodePath := n.InternalPath()
if err := n.SetXattrString(ctx, prefixes.TrashOriginAttr, origin); err != nil {
return err
}
var sizeDiff int64
if n.IsDir(ctx) {
treesize, err := n.GetTreeSize(ctx)
if err != nil {
return err // TODO calculate treesize if it is not set
}
sizeDiff = -int64(treesize)
} else {
sizeDiff = -n.Blobsize
}
deletionTime := time.Now().UTC().Format(time.RFC3339Nano)
// Prepare the trash
trashLink := filepath.Join(t.options.Root, "spaces", lookup.Pathify(n.SpaceRoot.ID, 1, 2), "trash", lookup.Pathify(n.ID, 4, 2))
if err := os.MkdirAll(filepath.Dir(trashLink), 0700); err != nil {
// Roll back changes
_ = n.RemoveXattr(ctx, prefixes.TrashOriginAttr, true)
return err
}
// FIXME can we just move the node into the trash dir? instead of adding another symlink and appending a trash timestamp?
// can we just use the mtime as the trash time?
// TODO store a trashed by userid
// first make node appear in the space trash
// parent id and name are stored as extended attributes in the node itself
err = os.Symlink("../../../../../nodes/"+lookup.Pathify(n.ID, 4, 2)+node.TrashIDDelimiter+deletionTime, trashLink)
if err != nil {
// Roll back changes
_ = n.RemoveXattr(ctx, prefixes.TrashOriginAttr, true)
return
}
// at this point we have a symlink pointing to a non existing destination, which is fine
// rename the trashed node so it is not picked up when traversing up the tree and matches the symlink
trashPath := nodePath + node.TrashIDDelimiter + deletionTime
err = os.Rename(nodePath, trashPath)
if err != nil {
// To roll back changes
// TODO remove symlink
// Roll back changes
_ = n.RemoveXattr(ctx, prefixes.TrashOriginAttr, true)
return
}
err = t.lookup.MetadataBackend().Rename(nodePath, trashPath)
if err != nil {
_ = n.RemoveXattr(ctx, prefixes.TrashOriginAttr, true)
_ = os.Rename(trashPath, nodePath)
return
}
// Remove lock file if it exists
_ = os.Remove(n.LockFilePath())
// finally remove the entry from the parent dir
if err = os.Remove(path); err != nil {
// To roll back changes
// TODO revert the rename
// TODO remove symlink
// Roll back changes
_ = n.RemoveXattr(ctx, prefixes.TrashOriginAttr, true)
return
}
return t.Propagate(ctx, n, sizeDiff)
}
// RestoreRecycleItemFunc returns a node and a function to restore it from the trash.
func (t *Tree) RestoreRecycleItemFunc(ctx context.Context, spaceid, key, trashPath string, targetNode *node.Node) (*node.Node, *node.Node, func() error, error) {
recycleNode, trashItem, deletedNodePath, origin, err := t.readRecycleItem(ctx, spaceid, key, trashPath)
if err != nil {
return nil, nil, nil, err
}
targetRef := &provider.Reference{
ResourceId: &provider.ResourceId{SpaceId: spaceid, OpaqueId: spaceid},
Path: utils.MakeRelativePath(origin),
}
if targetNode == nil {
targetNode, err = t.lookup.NodeFromResource(ctx, targetRef)
if err != nil {
return nil, nil, nil, err
}
}
if err := targetNode.CheckLock(ctx); err != nil {
return nil, nil, nil, err
}
parent, err := targetNode.Parent(ctx)
if err != nil {
return nil, nil, nil, err
}
fn := func() error {
if targetNode.Exists {
return errtypes.AlreadyExists("origin already exists")
}
// add the entry for the parent dir
err = os.Symlink("../../../../../"+lookup.Pathify(recycleNode.ID, 4, 2), filepath.Join(targetNode.ParentPath(), targetNode.Name))
if err != nil {
return err
}
// rename to node only name, so it is picked up by id
nodePath := recycleNode.InternalPath()
// attempt to rename only if we're not in a subfolder
if deletedNodePath != nodePath {
err = os.Rename(deletedNodePath, nodePath)
if err != nil {
return err
}
err = t.lookup.MetadataBackend().Rename(deletedNodePath, nodePath)
if err != nil {
return err
}
}
targetNode.Exists = true
attrs := node.Attributes{}
attrs.SetString(prefixes.NameAttr, targetNode.Name)
if trashPath != "" {
// set ParentidAttr to restorePath's node parent id
attrs.SetString(prefixes.ParentidAttr, targetNode.ParentID)
}
if err = recycleNode.SetXattrsWithContext(ctx, attrs, true); err != nil {
return errors.Wrap(err, "Decomposedfs: could not update recycle node")
}
// delete item link in trash
deletePath := trashItem
if trashPath != "" && trashPath != "/" {
resolvedTrashRoot, err := filepath.EvalSymlinks(trashItem)
if err != nil {
return errors.Wrap(err, "Decomposedfs: could not resolve trash root")
}
deletePath = filepath.Join(resolvedTrashRoot, trashPath)
}
if err = os.Remove(deletePath); err != nil {
log.Error().Err(err).Str("trashItem", trashItem).Msg("error deleting trash item")
}
var sizeDiff int64
if recycleNode.IsDir(ctx) {
treeSize, err := recycleNode.GetTreeSize(ctx)
if err != nil {
return err
}
sizeDiff = int64(treeSize)
} else {
sizeDiff = recycleNode.Blobsize
}
return t.Propagate(ctx, targetNode, sizeDiff)
}
return recycleNode, parent, fn, nil
}
// PurgeRecycleItemFunc returns a node and a function to purge it from the trash
func (t *Tree) PurgeRecycleItemFunc(ctx context.Context, spaceid, key string, path string) (*node.Node, func() error, error) {
rn, trashItem, deletedNodePath, _, err := t.readRecycleItem(ctx, spaceid, key, path)
if err != nil {
return nil, nil, err
}
fn := func() error {
if err := t.removeNode(ctx, deletedNodePath, rn); err != nil {
return err
}
// delete item link in trash
deletePath := trashItem
if path != "" && path != "/" {
resolvedTrashRoot, err := filepath.EvalSymlinks(trashItem)
if err != nil {
return errors.Wrap(err, "Decomposedfs: could not resolve trash root")
}
deletePath = filepath.Join(resolvedTrashRoot, path)
}
if err = os.Remove(deletePath); err != nil {
log.Error().Err(err).Str("deletePath", deletePath).Msg("error deleting trash item")
return err
}
return nil
}
return rn, fn, nil
}
func (t *Tree) removeNode(ctx context.Context, path string, n *node.Node) error {
// delete the actual node
if err := utils.RemoveItem(path); err != nil {
log.Error().Err(err).Str("path", path).Msg("error purging node")
return err
}
if err := t.lookup.MetadataBackend().Purge(path); err != nil {
log.Error().Err(err).Str("path", t.lookup.MetadataBackend().MetadataPath(path)).Msg("error purging node metadata")
return err
}
// delete blob from blobstore
if n.BlobID != "" {
if err := t.DeleteBlob(n); err != nil {
log.Error().Err(err).Str("blobID", n.BlobID).Msg("error purging nodes blob")
return err
}
}
// delete revisions
revs, err := filepath.Glob(n.InternalPath() + node.RevisionIDDelimiter + "*")
if err != nil {
log.Error().Err(err).Str("path", n.InternalPath()+node.RevisionIDDelimiter+"*").Msg("glob failed badly")
return err
}
for _, rev := range revs {
if t.lookup.MetadataBackend().IsMetaFile(rev) {
continue
}
bID, err := t.lookup.ReadBlobIDAttr(ctx, rev)
if err != nil {
log.Error().Err(err).Str("revision", rev).Msg("error reading blobid attribute")
return err
}
if err := utils.RemoveItem(rev); err != nil {
log.Error().Err(err).Str("revision", rev).Msg("error removing revision node")
return err
}
if bID != "" {
if err := t.DeleteBlob(&node.Node{SpaceID: n.SpaceID, BlobID: bID}); err != nil {
log.Error().Err(err).Str("revision", rev).Str("blobID", bID).Msg("error removing revision node blob")
return err
}
}
}
return nil
}
// Propagate propagates changes to the root of the tree
func (t *Tree) Propagate(ctx context.Context, n *node.Node, sizeDiff int64) (err error) {
return t.propagator.Propagate(ctx, n, sizeDiff)
}
// WriteBlob writes a blob to the blobstore
func (t *Tree) WriteBlob(node *node.Node, source string) error {
return t.blobstore.Upload(node, source)
}
// ReadBlob reads a blob from the blobstore
func (t *Tree) ReadBlob(node *node.Node) (io.ReadCloser, error) {
if node.BlobID == "" {
// there is no blob yet - we are dealing with a 0 byte file
return io.NopCloser(bytes.NewReader([]byte{})), nil
}
return t.blobstore.Download(node)
}
// DeleteBlob deletes a blob from the blobstore
func (t *Tree) DeleteBlob(node *node.Node) error {
if node == nil {
return fmt.Errorf("could not delete blob, nil node was given")
}
if node.BlobID == "" {
return fmt.Errorf("could not delete blob, node with empty blob id was given")
}
return t.blobstore.Delete(node)
}
// TODO check if node exists?
func (t *Tree) createDirNode(ctx context.Context, n *node.Node) (err error) {
ctx, span := tracer.Start(ctx, "createDirNode")
defer span.End()
// create a directory node
nodePath := n.InternalPath()
if err := os.MkdirAll(nodePath, 0700); err != nil {
return errors.Wrap(err, "Decomposedfs: error creating node")
}
attributes := n.NodeMetadata(ctx)
attributes[prefixes.TreesizeAttr] = []byte("0") // initialize as empty, TODO why bother? if it is not set we could treat it as 0?
if t.options.TreeTimeAccounting || t.options.TreeSizeAccounting {
attributes[prefixes.PropagationAttr] = []byte("1") // mark the node for propagation
}
return n.SetXattrsWithContext(ctx, attributes, true)
}
var nodeIDRegep = regexp.MustCompile(`.*/nodes/([^.]*).*`)
// TODO refactor the returned params into Node properties? would make all the path transformations go away...
func (t *Tree) readRecycleItem(ctx context.Context, spaceID, key, path string) (recycleNode *node.Node, trashItem string, deletedNodePath string, origin string, err error) {
if key == "" {
return nil, "", "", "", errtypes.InternalError("key is empty")
}
backend := t.lookup.MetadataBackend()
var nodeID string
trashItem = filepath.Join(t.lookup.InternalRoot(), "spaces", lookup.Pathify(spaceID, 1, 2), "trash", lookup.Pathify(key, 4, 2))
resolvedTrashItem, err := filepath.EvalSymlinks(trashItem)
if err != nil {
return
}
deletedNodePath, err = filepath.EvalSymlinks(filepath.Join(resolvedTrashItem, path))
if err != nil {
return
}
nodeID = nodeIDRegep.ReplaceAllString(deletedNodePath, "$1")
nodeID = strings.ReplaceAll(nodeID, "/", "")
recycleNode = node.New(spaceID, nodeID, "", "", 0, "", provider.ResourceType_RESOURCE_TYPE_INVALID, nil, t.lookup)
recycleNode.SpaceRoot, err = node.ReadNode(ctx, t.lookup, spaceID, spaceID, false, nil, false)
if err != nil {
return
}
recycleNode.SetType(t.lookup.TypeFromPath(ctx, deletedNodePath))
var attrBytes []byte
if recycleNode.Type(ctx) == provider.ResourceType_RESOURCE_TYPE_FILE {
// lookup blobID in extended attributes
if attrBytes, err = backend.Get(ctx, deletedNodePath, prefixes.BlobIDAttr); err == nil {
recycleNode.BlobID = string(attrBytes)
} else {
return
}
// lookup blobSize in extended attributes
if recycleNode.Blobsize, err = backend.GetInt64(ctx, deletedNodePath, prefixes.BlobsizeAttr); err != nil {
return
}
}
// lookup parent id in extended attributes
if attrBytes, err = backend.Get(ctx, deletedNodePath, prefixes.ParentidAttr); err == nil {
recycleNode.ParentID = string(attrBytes)
} else {
return
}
// lookup name in extended attributes
if attrBytes, err = backend.Get(ctx, deletedNodePath, prefixes.NameAttr); err == nil {
recycleNode.Name = string(attrBytes)
} else {
return
}
// get origin node, is relative to space root
origin = "/"
// lookup origin path in extended attributes
if attrBytes, err = backend.Get(ctx, resolvedTrashItem, prefixes.TrashOriginAttr); err == nil {
origin = filepath.Join(string(attrBytes), path)
} else {
log.Error().Err(err).Str("trashItem", trashItem).Str("deletedNodePath", deletedNodePath).Msg("could not read origin path, restoring to /")
}
return
}
func getNodeIDFromCache(ctx context.Context, path string, cache store.Store) string {
_, span := tracer.Start(ctx, "getNodeIDFromCache")
defer span.End()
recs, err := cache.Read(path)
if err == nil && len(recs) > 0 {
return string(recs[0].Value)
}
return ""
}
func storeNodeIDInCache(ctx context.Context, path string, nodeID string, cache store.Store) error {
_, span := tracer.Start(ctx, "storeNodeIDInCache")
defer span.End()
return cache.Write(&store.Record{
Key: path,
Value: []byte(nodeID),
})
}

View File

@@ -0,0 +1,33 @@
// Copyright 2018-2024 CERN
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// In applying this license, CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.
package aspects
import (
"github.com/cs3org/reva/v2/pkg/events"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/permissions"
)
// Aspects holds dependencies for handling aspects of the decomposedfs
type Aspects struct {
Lookup node.PathLookup
Tree node.Tree
Permissions permissions.Permissions
EventStream events.Stream
}

View File

@@ -18,16 +18,11 @@
package decomposedfs
//go:generate make --no-print-directory -C ../../../.. mockery NAME=PermissionsChecker
//go:generate make --no-print-directory -C ../../../.. mockery NAME=CS3PermissionsClient
//go:generate make --no-print-directory -C ../../../.. mockery NAME=Tree
import (
"context"
"fmt"
"io"
"net/url"
"os"
"path"
"path/filepath"
"strconv"
@@ -47,11 +42,13 @@ import (
"github.com/cs3org/reva/v2/pkg/storage"
"github.com/cs3org/reva/v2/pkg/storage/cache"
"github.com/cs3org/reva/v2/pkg/storage/utils/chunking"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/aspects"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/lookup"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/migrator"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/options"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/permissions"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/spaceidindex"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/tree"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/upload"
@@ -83,28 +80,6 @@ func init() {
tracer = otel.Tracer("github.com/cs3org/reva/pkg/storage/utils/decomposedfs")
}
// Tree is used to manage a tree hierarchy
type Tree interface {
Setup() error
GetMD(ctx context.Context, node *node.Node) (os.FileInfo, error)
ListFolder(ctx context.Context, node *node.Node) ([]*node.Node, error)
// CreateHome(owner *userpb.UserId) (n *node.Node, err error)
CreateDir(ctx context.Context, node *node.Node) (err error)
TouchFile(ctx context.Context, node *node.Node, markprocessing bool, mtime string) error
// CreateReference(ctx context.Context, node *node.Node, targetURI *url.URL) error
Move(ctx context.Context, oldNode *node.Node, newNode *node.Node) (err error)
Delete(ctx context.Context, node *node.Node) (err error)
RestoreRecycleItemFunc(ctx context.Context, spaceid, key, trashPath string, target *node.Node) (*node.Node, *node.Node, func() error, error)
PurgeRecycleItemFunc(ctx context.Context, spaceid, key, purgePath string) (*node.Node, func() error, error)
WriteBlob(node *node.Node, source string) error
ReadBlob(node *node.Node) (io.ReadCloser, error)
DeleteBlob(node *node.Node) error
Propagate(ctx context.Context, node *node.Node, sizeDiff int64) (err error)
}
// Session is the interface that OcisSession implements. By combining tus.Upload,
// storage.UploadSession and custom functions we can reuse the same struct throughout
// the whole upload lifecycle.
@@ -127,10 +102,10 @@ type SessionStore interface {
// Decomposedfs provides the base for decomposed filesystem implementations
type Decomposedfs struct {
lu *lookup.Lookup
tp Tree
lu node.PathLookup
tp node.Tree
o *options.Options
p Permissions
p permissions.Permissions
chunkHandler *chunking.ChunkHandler
stream events.Stream
cache cache.StatCache
@@ -175,23 +150,29 @@ func NewDefault(m map[string]interface{}, bs tree.Blobstore, es events.Stream) (
return nil, err
}
permissions := NewPermissions(node.NewPermissions(lu), permissionsSelector)
aspects := aspects.Aspects{
Lookup: lu,
Tree: tp,
Permissions: permissions.NewPermissions(node.NewPermissions(lu), permissionsSelector),
EventStream: es,
}
return New(o, lu, permissions, tp, es)
return New(o, aspects)
}
// New returns an implementation of the storage.FS interface that talks to
// a local filesystem.
func New(o *options.Options, lu *lookup.Lookup, p Permissions, tp Tree, es events.Stream) (storage.FS, error) {
func New(o *options.Options, aspects aspects.Aspects) (storage.FS, error) {
log := logger.New()
err := tp.Setup()
err := aspects.Tree.Setup()
if err != nil {
log.Error().Err(err).Msg("could not setup tree")
return nil, errors.Wrap(err, "could not setup tree")
}
// Run migrations & return
m := migrator.New(lu, log)
m := migrator.New(aspects.Lookup, log)
err = m.RunMigrations()
if err != nil {
log.Error().Err(err).Msg("could not migrate tree")
@@ -222,18 +203,18 @@ func New(o *options.Options, lu *lookup.Lookup, p Permissions, tp Tree, es event
}
fs := &Decomposedfs{
tp: tp,
lu: lu,
tp: aspects.Tree,
lu: aspects.Lookup,
o: o,
p: p,
p: aspects.Permissions,
chunkHandler: chunking.NewChunkHandler(filepath.Join(o.Root, "uploads")),
stream: es,
stream: aspects.EventStream,
cache: cache.GetStatCache(o.StatCache),
UserCache: ttlcache.NewCache(),
userSpaceIndex: userSpaceIndex,
groupSpaceIndex: groupSpaceIndex,
spaceTypeIndex: spaceTypeIndex,
sessionStore: upload.NewSessionStore(lu, tp, o.Root, es, o.AsyncFileUploads, o.Tokens),
sessionStore: upload.NewSessionStore(aspects.Lookup, aspects.Tree, o.Root, aspects.EventStream, o.AsyncFileUploads, o.Tokens),
}
if o.AsyncFileUploads {

View File

@@ -25,6 +25,7 @@ import (
"path/filepath"
"strings"
user "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"github.com/cs3org/reva/v2/pkg/appctx"
"github.com/cs3org/reva/v2/pkg/errtypes"
@@ -32,6 +33,7 @@ import (
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata/prefixes"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/options"
"github.com/google/uuid"
"github.com/pkg/errors"
"github.com/rogpeppe/go-internal/lockedfile"
"go.opentelemetry.io/otel"
@@ -40,24 +42,14 @@ import (
var tracer trace.Tracer
const (
_spaceTypePersonal = "personal"
)
func init() {
tracer = otel.Tracer("github.com/cs3org/reva/pkg/storage/utils/decomposedfs/lookup")
}
// PathLookup defines the interface for the lookup component
type PathLookup interface {
NodeFromResource(ctx context.Context, ref *provider.Reference) (*node.Node, error)
NodeFromID(ctx context.Context, id *provider.ResourceId) (n *node.Node, err error)
InternalRoot() string
InternalPath(spaceID, nodeID string) string
Path(ctx context.Context, n *node.Node, hasPermission node.PermissionFunc) (path string, err error)
MetadataBackend() metadata.Backend
ReadBlobSizeAttr(ctx context.Context, path string) (int64, error)
ReadBlobIDAttr(ctx context.Context, path string) (string, error)
TypeFromPath(ctx context.Context, path string) provider.ResourceType
}
// Lookup implements transformations from filepath to node and back
type Lookup struct {
Options *options.Options
@@ -200,6 +192,16 @@ func (lu *Lookup) NodeFromSpaceID(ctx context.Context, spaceID string) (n *node.
return node, nil
}
// GenerateSpaceID generates a new space id and alias
func (lu *Lookup) GenerateSpaceID(spaceType string, owner *user.User) (string, error) {
switch spaceType {
case _spaceTypePersonal:
return owner.Id.OpaqueId, nil
default:
return uuid.New().String(), nil
}
}
// Path returns the path for node
func (lu *Lookup) Path(ctx context.Context, n *node.Node, hasPermission node.PermissionFunc) (p string, err error) {
root := n.SpaceRoot

View File

@@ -25,7 +25,7 @@ import (
"path/filepath"
"sort"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/lookup"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node"
"github.com/rogpeppe/go-internal/lockedfile"
"github.com/rs/zerolog"
)
@@ -73,13 +73,13 @@ type Result string
// Migrator runs migrations on an existing decomposedfs
type Migrator struct {
lu *lookup.Lookup
lu node.PathLookup
states migrationStates
log *zerolog.Logger
}
// New returns a new Migrator instance
func New(lu *lookup.Lookup, log *zerolog.Logger) Migrator {
func New(lu node.PathLookup, log *zerolog.Logger) Migrator {
return Migrator{
lu: lu,
log: log,

View File

@@ -49,10 +49,13 @@ import (
"github.com/cs3org/reva/v2/pkg/utils"
"github.com/google/uuid"
"github.com/pkg/errors"
"github.com/rogpeppe/go-internal/lockedfile"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
)
//go:generate make --no-print-directory -C ../../../../.. mockery NAME=Tree
var tracer trace.Tracer
func init() {
@@ -83,6 +86,46 @@ const (
ProcessingStatus = "processing:"
)
// Tree is used to manage a tree hierarchy
type Tree interface {
Setup() error
GetMD(ctx context.Context, node *Node) (os.FileInfo, error)
ListFolder(ctx context.Context, node *Node) ([]*Node, error)
// CreateHome(owner *userpb.UserId) (n *Node, err error)
CreateDir(ctx context.Context, node *Node) (err error)
TouchFile(ctx context.Context, node *Node, markprocessing bool, mtime string) error
// CreateReference(ctx context.Context, node *Node, targetURI *url.URL) error
Move(ctx context.Context, oldNode *Node, newNode *Node) (err error)
Delete(ctx context.Context, node *Node) (err error)
RestoreRecycleItemFunc(ctx context.Context, spaceid, key, trashPath string, target *Node) (*Node, *Node, func() error, error)
PurgeRecycleItemFunc(ctx context.Context, spaceid, key, purgePath string) (*Node, func() error, error)
WriteBlob(node *Node, source string) error
ReadBlob(node *Node) (io.ReadCloser, error)
DeleteBlob(node *Node) error
Propagate(ctx context.Context, node *Node, sizeDiff int64) (err error)
}
// PathLookup defines the interface for the lookup component
type PathLookup interface {
NodeFromSpaceID(ctx context.Context, spaceID string) (n *Node, err error)
NodeFromResource(ctx context.Context, ref *provider.Reference) (*Node, error)
NodeFromID(ctx context.Context, id *provider.ResourceId) (n *Node, err error)
GenerateSpaceID(spaceType string, owner *userpb.User) (string, error)
InternalRoot() string
InternalPath(spaceID, nodeID string) string
Path(ctx context.Context, n *Node, hasPermission PermissionFunc) (path string, err error)
MetadataBackend() metadata.Backend
ReadBlobSizeAttr(ctx context.Context, path string) (int64, error)
ReadBlobIDAttr(ctx context.Context, path string) (string, error)
TypeFromPath(ctx context.Context, path string) provider.ResourceType
CopyMetadataWithSourceLock(ctx context.Context, sourcePath, targetPath string, filter func(attributeName string, value []byte) (newValue []byte, copy bool), lockedSource *lockedfile.File, acquireTargetLock bool) (err error)
CopyMetadata(ctx context.Context, src, target string, filter func(attributeName string, value []byte) (newValue []byte, copy bool), acquireTargetLock bool) (err error)
}
// Node represents a node in the tree and provides methods to get a Parent or Child instance
type Node struct {
SpaceID string
@@ -100,16 +143,6 @@ type Node struct {
nodeType *provider.ResourceType
}
// PathLookup defines the interface for the lookup component
type PathLookup interface {
InternalRoot() string
InternalPath(spaceID, nodeID string) string
Path(ctx context.Context, n *Node, hasPermission PermissionFunc) (path string, err error)
MetadataBackend() metadata.Backend
ReadBlobSizeAttr(ctx context.Context, path string) (int64, error)
ReadBlobIDAttr(ctx context.Context, path string) (string, error)
}
// New returns a new instance of Node
func New(spaceID, id, parentID, name string, blobsize int64, blobID string, t provider.ResourceType, owner *userpb.UserId, lu PathLookup) *Node {
if blobID == "" {

View File

@@ -1,4 +1,4 @@
package decomposedfs
package permissions
import (
"context"
@@ -11,9 +11,27 @@ import (
"github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node"
"github.com/cs3org/reva/v2/pkg/utils"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc"
)
//go:generate make --no-print-directory -C ../../../../.. mockery NAME=PermissionsChecker
//go:generate make --no-print-directory -C ../../../../.. mockery NAME=CS3PermissionsClient
var (
tracer trace.Tracer
)
func init() {
tracer = otel.Tracer("github.com/cs3org/reva/pkg/storage/utils/decomposedfs/permissions")
}
const (
_spaceTypePersonal = "personal"
_spaceTypeProject = "project"
)
// PermissionsChecker defines an interface for checking permissions on a Node
type PermissionsChecker interface {
AssemblePermissions(ctx context.Context, n *node.Node) (ap provider.ResourcePermissions, err error)

View File

@@ -44,11 +44,11 @@ import (
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/lookup"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata/prefixes"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/permissions"
"github.com/cs3org/reva/v2/pkg/storage/utils/filelocks"
"github.com/cs3org/reva/v2/pkg/storage/utils/templates"
"github.com/cs3org/reva/v2/pkg/storagespace"
"github.com/cs3org/reva/v2/pkg/utils"
"github.com/google/uuid"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
)
@@ -66,9 +66,13 @@ const (
// CreateStorageSpace creates a storage space
func (fs *Decomposedfs) CreateStorageSpace(ctx context.Context, req *provider.CreateStorageSpaceRequest) (*provider.CreateStorageSpaceResponse, error) {
ctx = context.WithValue(ctx, utils.SpaceGrant, struct{}{})
u := ctxpkg.ContextMustGetUser(ctx)
// "everything is a resource" this is the unique ID for the Space resource.
spaceID := uuid.New().String()
spaceID, err := fs.lu.GenerateSpaceID(req.Type, req.GetOwner())
if err != nil {
return nil, err
}
// allow sending a space id
if reqSpaceID := utils.ReadPlainFromOpaque(req.Opaque, "spaceid"); reqSpaceID != "" {
spaceID = reqSpaceID
@@ -77,14 +81,12 @@ func (fs *Decomposedfs) CreateStorageSpace(ctx context.Context, req *provider.Cr
description := utils.ReadPlainFromOpaque(req.Opaque, "description")
// allow sending a spaceAlias
alias := utils.ReadPlainFromOpaque(req.Opaque, "spaceAlias")
u := ctxpkg.ContextMustGetUser(ctx)
if alias == "" {
alias = templates.WithSpacePropertiesAndUser(u, req.Type, req.Name, fs.o.GeneralSpaceAliasTemplate)
}
// TODO enforce a uuid?
// TODO clarify if we want to enforce a single personal storage space or if we want to allow sending the spaceid
if req.Type == _spaceTypePersonal {
spaceID = req.GetOwner().GetId().GetOpaqueId()
alias = templates.WithSpacePropertiesAndUser(u, req.Type, req.Name, fs.o.PersonalSpaceAliasTemplate)
}
@@ -620,7 +622,7 @@ func (fs *Decomposedfs) UpdateStorageSpace(ctx context.Context, req *provider.Up
}
if !restore && len(metadata) == 0 && !IsViewer(sp) {
if !restore && len(metadata) == 0 && !permissions.IsViewer(sp) {
// you may land here when making an update request without changes
// check if user has access to the drive before continuing
return &provider.UpdateStorageSpaceResponse{
@@ -628,10 +630,10 @@ func (fs *Decomposedfs) UpdateStorageSpace(ctx context.Context, req *provider.Up
}, nil
}
if !IsManager(sp) {
if !permissions.IsManager(sp) {
// We are not a space manager. We need to check for additional permissions.
k := []string{prefixes.NameAttr, prefixes.SpaceDescriptionAttr}
if !IsEditor(sp) {
if !permissions.IsEditor(sp) {
k = append(k, prefixes.SpaceReadmeAttr, prefixes.SpaceAliasAttr, prefixes.SpaceImageAttr)
}
@@ -800,7 +802,7 @@ func (fs *Decomposedfs) storageSpaceFromNode(ctx context.Context, n *node.Node,
if n.SpaceRoot.IsDisabled(ctx) {
rp, err := fs.p.AssemblePermissions(ctx, n)
if err != nil || !IsManager(rp) {
if err != nil || !permissions.IsManager(rp) {
return nil, errtypes.PermissionDenied(fmt.Sprintf("user %s is not allowed to list deleted spaces %s", user.Username, n.ID))
}
}
@@ -1060,7 +1062,7 @@ func (fs *Decomposedfs) getSpaceRoot(spaceID string) string {
// - a user with the "delete-all-spaces" permission may delete but not enable/disable any project space
// - a user with the "Drive.ReadWriteEnabled" permission may enable/disable but not delete any project space
// - a project space can always be enabled/disabled/deleted by its manager (i.e. users have the "remove" grant)
func canDeleteSpace(ctx context.Context, spaceID string, typ string, purge bool, n *node.Node, p Permissions) error {
func canDeleteSpace(ctx context.Context, spaceID string, typ string, purge bool, n *node.Node, p permissions.Permissions) error {
// delete-all-home spaces allows to disable and delete a personal space
if typ == "personal" {
if p.DeleteAllHomeSpaces(ctx) {
@@ -1070,7 +1072,7 @@ func canDeleteSpace(ctx context.Context, spaceID string, typ string, purge bool,
}
// space managers are allowed to disable and delete their project spaces
if rp, err := p.AssemblePermissions(ctx, n); err == nil && IsManager(rp) {
if rp, err := p.AssemblePermissions(ctx, n); err == nil && permissions.IsManager(rp) {
return nil
}

View File

@@ -48,7 +48,7 @@ type AsyncPropagator struct {
treeSizeAccounting bool
treeTimeAccounting bool
propagationDelay time.Duration
lookup lookup.PathLookup
lookup node.PathLookup
}
// Change represents a change to the tree
@@ -58,7 +58,7 @@ type Change struct {
}
// NewAsyncPropagator returns a new AsyncPropagator instance
func NewAsyncPropagator(treeSizeAccounting, treeTimeAccounting bool, o options.AsyncPropagatorOptions, lookup lookup.PathLookup) AsyncPropagator {
func NewAsyncPropagator(treeSizeAccounting, treeTimeAccounting bool, o options.AsyncPropagatorOptions, lookup node.PathLookup) AsyncPropagator {
p := AsyncPropagator{
treeSizeAccounting: treeSizeAccounting,
treeTimeAccounting: treeTimeAccounting,

View File

@@ -26,7 +26,6 @@ import (
sprovider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"github.com/cs3org/reva/v2/pkg/appctx"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/lookup"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata/prefixes"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/options"
@@ -45,7 +44,7 @@ type Propagator interface {
Propagate(ctx context.Context, node *node.Node, sizediff int64) error
}
func New(lookup lookup.PathLookup, o *options.Options) Propagator {
func New(lookup node.PathLookup, o *options.Options) Propagator {
switch o.Propagator {
case "async":
return NewAsyncPropagator(o.TreeSizeAccounting, o.TreeTimeAccounting, o.AsyncPropagatorOptions, lookup)
@@ -54,7 +53,7 @@ func New(lookup lookup.PathLookup, o *options.Options) Propagator {
}
}
func calculateTreeSize(ctx context.Context, lookup lookup.PathLookup, childrenPath string) (uint64, error) {
func calculateTreeSize(ctx context.Context, lookup node.PathLookup, childrenPath string) (uint64, error) {
ctx, span := tracer.Start(ctx, "calculateTreeSize")
defer span.End()
var size uint64

View File

@@ -26,7 +26,6 @@ import (
"time"
"github.com/cs3org/reva/v2/pkg/appctx"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/lookup"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata/prefixes"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node"
@@ -37,11 +36,11 @@ import (
type SyncPropagator struct {
treeSizeAccounting bool
treeTimeAccounting bool
lookup lookup.PathLookup
lookup node.PathLookup
}
// NewSyncPropagator returns a new AsyncPropagator instance
func NewSyncPropagator(treeSizeAccounting, treeTimeAccounting bool, lookup lookup.PathLookup) SyncPropagator {
func NewSyncPropagator(treeSizeAccounting, treeTimeAccounting bool, lookup node.PathLookup) SyncPropagator {
return SyncPropagator{
treeSizeAccounting: treeSizeAccounting,
treeTimeAccounting: treeTimeAccounting,

View File

@@ -66,7 +66,7 @@ type Blobstore interface {
// Tree manages a hierarchical tree
type Tree struct {
lookup lookup.PathLookup
lookup node.PathLookup
blobstore Blobstore
propagator propagator.Propagator
@@ -79,7 +79,7 @@ type Tree struct {
type PermissionCheckFunc func(rp *provider.ResourcePermissions) bool
// New returns a new instance of Tree
func New(lu lookup.PathLookup, bs Blobstore, o *options.Options, cache store.Store) *Tree {
func New(lu node.PathLookup, bs Blobstore, o *options.Options, cache store.Store) *Tree {
return &Tree{
lookup: lu,
blobstore: bs,

View File

@@ -53,7 +53,7 @@ type PermissionsChecker interface {
// OcisStore manages upload sessions
type OcisStore struct {
lu *lookup.Lookup
lu node.PathLookup
tp Tree
root string
pub events.Publisher
@@ -62,7 +62,7 @@ type OcisStore struct {
}
// NewSessionStore returns a new OcisStore
func NewSessionStore(lu *lookup.Lookup, tp Tree, root string, pub events.Publisher, async bool, tknopts options.TokenOptions) *OcisStore {
func NewSessionStore(lu node.PathLookup, tp Tree, root string, pub events.Publisher, async bool, tknopts options.TokenOptions) *OcisStore {
return &OcisStore{
lu: lu,
tp: tp,

8
vendor/modules.txt vendored
View File

@@ -362,7 +362,7 @@ github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1
github.com/cs3org/go-cs3apis/cs3/storage/registry/v1beta1
github.com/cs3org/go-cs3apis/cs3/tx/v1beta1
github.com/cs3org/go-cs3apis/cs3/types/v1beta1
# github.com/cs3org/reva/v2 v2.18.1-0.20240129131717-cff0a2eeb959
# github.com/cs3org/reva/v2 v2.18.1-0.20240205065033-2c21ada2ae52
## explicit; go 1.21
github.com/cs3org/reva/v2/cmd/revad/internal/grace
github.com/cs3org/reva/v2/cmd/revad/runtime
@@ -658,6 +658,10 @@ github.com/cs3org/reva/v2/pkg/storage/fs/ocis
github.com/cs3org/reva/v2/pkg/storage/fs/ocis/blobstore
github.com/cs3org/reva/v2/pkg/storage/fs/owncloudsql
github.com/cs3org/reva/v2/pkg/storage/fs/owncloudsql/filecache
github.com/cs3org/reva/v2/pkg/storage/fs/posix
github.com/cs3org/reva/v2/pkg/storage/fs/posix/blobstore
github.com/cs3org/reva/v2/pkg/storage/fs/posix/lookup
github.com/cs3org/reva/v2/pkg/storage/fs/posix/tree
github.com/cs3org/reva/v2/pkg/storage/fs/registry
github.com/cs3org/reva/v2/pkg/storage/fs/s3
github.com/cs3org/reva/v2/pkg/storage/fs/s3ng
@@ -670,6 +674,7 @@ github.com/cs3org/reva/v2/pkg/storage/utils/ace
github.com/cs3org/reva/v2/pkg/storage/utils/acl
github.com/cs3org/reva/v2/pkg/storage/utils/chunking
github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs
github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/aspects
github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/lookup
github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata
github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata/prefixes
@@ -677,6 +682,7 @@ github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/migrator
github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/mtimesyncedcache
github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node
github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/options
github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/permissions
github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/spaceidindex
github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/tree
github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/tree/propagator