feat(reva): bump reva

Signed-off-by: jkoberg <jkoberg@owncloud.com>
This commit is contained in:
jkoberg
2024-09-17 10:34:41 +02:00
parent ffe1946686
commit 0d5a1f189f
34 changed files with 1058 additions and 331 deletions

View File

@@ -0,0 +1,5 @@
Enhancement: Bump reva
Bumps reva version
https://github.com/owncloud/ocis/pull/9817

2
go.mod
View File

@@ -15,7 +15,7 @@ require (
github.com/cenkalti/backoff v2.2.1+incompatible
github.com/coreos/go-oidc/v3 v3.11.0
github.com/cs3org/go-cs3apis v0.0.0-20240724121416-062c4e3046cb
github.com/cs3org/reva/v2 v2.24.1
github.com/cs3org/reva/v2 v2.24.2-0.20240917121936-fb394587b472
github.com/dhowden/tag v0.0.0-20230630033851-978a0926ee25
github.com/dutchcoders/go-clamd v0.0.0-20170520113014-b970184f4d9e
github.com/egirna/icap-client v0.1.1

4
go.sum
View File

@@ -255,8 +255,8 @@ github.com/crewjam/saml v0.4.14 h1:g9FBNx62osKusnFzs3QTN5L9CVA/Egfgm+stJShzw/c=
github.com/crewjam/saml v0.4.14/go.mod h1:UVSZCf18jJkk6GpWNVqcyQJMD5HsRugBPf4I1nl2mME=
github.com/cs3org/go-cs3apis v0.0.0-20240724121416-062c4e3046cb h1:KmYZDReplv/yfwc1LNYpDcVhVujC3Pasv6WjXx1haSU=
github.com/cs3org/go-cs3apis v0.0.0-20240724121416-062c4e3046cb/go.mod h1:yyP8PRo0EZou3nSH7H4qjlzQwaydPeIRNgX50npQHpE=
github.com/cs3org/reva/v2 v2.24.1 h1:rOzAuWxby1RR8FiMl67fgnryYvqwEYs+mPndVNAJTbk=
github.com/cs3org/reva/v2 v2.24.1/go.mod h1:p7CHBXcg6sSqB+0JMNDfC1S7TSh9FghXkw1kTV3KcJI=
github.com/cs3org/reva/v2 v2.24.2-0.20240917121936-fb394587b472 h1:dCgRDvgWefPh6oyqggQ4o7/AcrEAk4niihlX2ZnhGjk=
github.com/cs3org/reva/v2 v2.24.2-0.20240917121936-fb394587b472/go.mod h1:p7CHBXcg6sSqB+0JMNDfC1S7TSh9FghXkw1kTV3KcJI=
github.com/cyberdelia/templates v0.0.0-20141128023046-ca7fffd4298c/go.mod h1:GyV+0YP4qX0UQ7r2MoYZ+AvYDp12OF5yg4q8rGnyNh4=
github.com/cyphar/filepath-securejoin v0.2.4 h1:Ugdm7cg7i6ZK6x3xDF1oEu1nfkyfH53EtKeQYTC3kyg=
github.com/cyphar/filepath-securejoin v0.2.4/go.mod h1:aPGpWjXOXUn2NCNjFvBE6aRxGGx79pTxQpKOJNYHHl4=

View File

@@ -178,7 +178,7 @@ func (s *svc) Handler() http.Handler {
var head string
head, r.URL.Path = router.ShiftPath(r.URL.Path)
log.Debug().Str("head", head).Str("tail", r.URL.Path).Msg("http routing")
log.Debug().Str("method", r.Method).Str("head", head).Str("tail", r.URL.Path).Msg("http routing")
switch head {
case "status.php", "status":
s.doStatus(w, r)

View File

@@ -180,8 +180,9 @@ type UploadReady struct {
SpaceOwner *user.UserId
ExecutingUser *user.User
FileRef *provider.Reference
Failed bool
Timestamp *types.Timestamp
Failed bool
IsVersion bool
// add reference here? We could use it to inform client pp is finished
}

View File

@@ -41,6 +41,11 @@ func New(root string) (*Blobstore, error) {
// Upload stores some data in the blobstore under the given key
func (bs *Blobstore) Upload(node *node.Node, source string) error {
path := node.InternalPath()
// preserve the mtime of the file
fi, _ := os.Stat(path)
file, err := os.Open(source)
if err != nil {
return errors.Wrap(err, "Decomposedfs: oCIS blobstore: Can not open source file to upload")
@@ -59,7 +64,15 @@ func (bs *Blobstore) Upload(node *node.Node, source string) error {
return errors.Wrapf(err, "could not write blob '%s'", node.InternalPath())
}
return w.Flush()
err = w.Flush()
if err != nil {
return err
}
if fi != nil {
return os.Chtimes(path, fi.ModTime(), fi.ModTime())
}
return nil
}
// Download retrieves a blob from the blobstore for reading

View File

@@ -72,15 +72,17 @@ type Lookup struct {
IDCache IDCache
metadataBackend metadata.Backend
userMapper usermapper.Mapper
tm node.TimeManager
}
// New returns a new Lookup instance
func New(b metadata.Backend, um usermapper.Mapper, o *options.Options) *Lookup {
func New(b metadata.Backend, um usermapper.Mapper, o *options.Options, tm node.TimeManager) *Lookup {
lu := &Lookup{
Options: o,
metadataBackend: b,
IDCache: NewStoreIDCache(&o.Options),
userMapper: um,
tm: tm,
}
return lu
@@ -122,22 +124,12 @@ func (lu *Lookup) MetadataBackend() metadata.Backend {
return lu.metadataBackend
}
// ReadBlobSizeAttr reads the blobsize from the xattrs
func (lu *Lookup) ReadBlobSizeAttr(ctx context.Context, path string) (int64, error) {
blobSize, err := lu.metadataBackend.GetInt64(ctx, path, prefixes.BlobsizeAttr)
func (lu *Lookup) ReadBlobIDAndSizeAttr(ctx context.Context, path string, _ node.Attributes) (string, int64, error) {
fi, err := os.Stat(path)
if err != nil {
return 0, errors.Wrapf(err, "error reading blobsize xattr")
return "", 0, errors.Wrap(err, "error stating file")
}
return blobSize, nil
}
// ReadBlobIDAttr reads the blobsize from the xattrs
func (lu *Lookup) ReadBlobIDAttr(ctx context.Context, path string) (string, error) {
attr, err := lu.metadataBackend.Get(ctx, path, prefixes.BlobIDAttr)
if err != nil {
return "", errors.Wrapf(err, "error reading blobid xattr")
}
return string(attr), nil
return "", fi.Size(), nil
}
// TypeFromPath returns the type of the node at the given path
@@ -421,3 +413,8 @@ func (lu *Lookup) GenerateSpaceID(spaceType string, owner *user.User) (string, e
return "", fmt.Errorf("unsupported space type: %s", spaceType)
}
}
// TimeManager returns the time manager
func (lu *Lookup) TimeManager() node.TimeManager {
return lu.tm
}

View File

@@ -19,6 +19,8 @@
package options
import (
"time"
decomposedoptions "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/options"
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
@@ -29,6 +31,8 @@ type Options struct {
UseSpaceGroups bool `mapstructure:"use_space_groups"`
ScanDebounceDelay time.Duration `mapstructure:"scan_debounce_delay"`
WatchFS bool `mapstructure:"watch_fs"`
WatchType string `mapstructure:"watch_type"`
WatchPath string `mapstructure:"watch_path"`

View File

@@ -36,6 +36,8 @@ import (
"github.com/cs3org/reva/v2/pkg/storage/fs/posix/blobstore"
"github.com/cs3org/reva/v2/pkg/storage/fs/posix/lookup"
"github.com/cs3org/reva/v2/pkg/storage/fs/posix/options"
"github.com/cs3org/reva/v2/pkg/storage/fs/posix/timemanager"
"github.com/cs3org/reva/v2/pkg/storage/fs/posix/trashbin"
"github.com/cs3org/reva/v2/pkg/storage/fs/posix/tree"
"github.com/cs3org/reva/v2/pkg/storage/fs/registry"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs"
@@ -68,24 +70,39 @@ func New(m map[string]interface{}, stream events.Stream) (storage.FS, error) {
return nil, err
}
bs, err := blobstore.New(o.Root)
if err != nil {
return nil, err
}
fs := &posixFS{}
um := usermapper.NewUnixMapper()
var lu *lookup.Lookup
switch o.MetadataBackend {
case "xattrs":
lu = lookup.New(metadata.NewXattrsBackend(o.Root, o.FileMetadataCache), um, o)
lu = lookup.New(metadata.NewXattrsBackend(o.Root, o.FileMetadataCache), um, o, &timemanager.Manager{})
case "messagepack":
lu = lookup.New(metadata.NewMessagePackBackend(o.Root, o.FileMetadataCache), um, o)
lu = lookup.New(metadata.NewMessagePackBackend(o.Root, o.FileMetadataCache), um, o, &timemanager.Manager{})
default:
return nil, fmt.Errorf("unknown metadata backend %s, only 'messagepack' or 'xattrs' (default) supported", o.MetadataBackend)
}
tp, err := tree.New(lu, bs, um, o, stream, store.Create(
trashbin, err := trashbin.New(o, lu)
if err != nil {
return nil, err
}
err = trashbin.Setup(fs)
if err != nil {
return nil, err
}
bs, err := blobstore.New(o.Root)
if err != nil {
return nil, err
}
switch o.IDCache.Store {
case "", "memory", "noop":
return nil, fmt.Errorf("the posix driver requires a shared id cache, e.g. nats-js-kv or redis")
}
tp, err := tree.New(lu, bs, um, trashbin, o, stream, store.Create(
store.Store(o.IDCache.Store),
store.TTL(o.IDCache.TTL),
store.Size(o.IDCache.Size),
@@ -113,6 +130,7 @@ func New(m map[string]interface{}, stream events.Stream) (storage.FS, error) {
EventStream: stream,
UserMapper: um,
DisableVersioning: true,
Trashbin: trashbin,
}
dfs, err := decomposedfs.New(&o.Options, aspects)
@@ -154,10 +172,8 @@ func New(m map[string]interface{}, stream events.Stream) (storage.FS, error) {
}
mw := middleware.NewFS(dfs, hooks...)
fs := &posixFS{
FS: mw,
um: um,
}
fs.FS = mw
fs.um = um
return fs, nil
}

View File

@@ -0,0 +1,118 @@
// Copyright 2018-2024 CERN
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// In applying this license, CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.
package timemanager
import (
"context"
"os"
"syscall"
"time"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata/prefixes"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node"
)
// Manager is responsible for managing time-related operations on files and directories.
type Manager struct {
}
// OverrideMtime overrides the modification time (mtime) of a node with the specified time.
func (m *Manager) OverrideMtime(ctx context.Context, n *node.Node, _ *node.Attributes, mtime time.Time) error {
return os.Chtimes(n.InternalPath(), mtime, mtime)
}
// MTime returns the modification time (mtime) of a node.
func (m *Manager) MTime(ctx context.Context, n *node.Node) (time.Time, error) {
fi, err := os.Stat(n.InternalPath())
if err != nil {
return time.Time{}, err
}
return fi.ModTime(), nil
}
// SetMTime sets the modification time (mtime) of a node to the specified time.
func (m *Manager) SetMTime(ctx context.Context, n *node.Node, mtime *time.Time) error {
return os.Chtimes(n.InternalPath(), *mtime, *mtime)
}
// TMTime returns the tree modification time (tmtime) of a node.
// If the tmtime is not set, it falls back to the modification time (mtime).
func (m *Manager) TMTime(ctx context.Context, n *node.Node) (time.Time, error) {
b, err := n.XattrString(ctx, prefixes.TreeMTimeAttr)
if err == nil {
return time.Parse(time.RFC3339Nano, b)
}
// no tmtime, use mtime
return m.MTime(ctx, n)
}
// SetTMTime sets the tree modification time (tmtime) of a node to the specified time.
// If tmtime is nil, the tmtime attribute is removed.
func (m *Manager) SetTMTime(ctx context.Context, n *node.Node, tmtime *time.Time) error {
if tmtime == nil {
return n.RemoveXattr(ctx, prefixes.TreeMTimeAttr, true)
}
return n.SetXattrString(ctx, prefixes.TreeMTimeAttr, tmtime.UTC().Format(time.RFC3339Nano))
}
// CTime returns the creation time (ctime) of a node.
func (m *Manager) CTime(ctx context.Context, n *node.Node) (time.Time, error) {
fi, err := os.Stat(n.InternalPath())
if err != nil {
return time.Time{}, err
}
stat := fi.Sys().(*syscall.Stat_t)
statCTime := StatCTime(stat)
//nolint:unconvert
return time.Unix(int64(statCTime.Sec), int64(statCTime.Nsec)), nil
}
// TCTime returns the tree creation time (tctime) of a node.
// Since decomposedfs does not differentiate between ctime and mtime, it falls back to TMTime.
func (m *Manager) TCTime(ctx context.Context, n *node.Node) (time.Time, error) {
// decomposedfs does not differentiate between ctime and mtime
return m.TMTime(ctx, n)
}
// SetTCTime sets the tree creation time (tctime) of a node to the specified time.
// Since decomposedfs does not differentiate between ctime and mtime, it falls back to SetTMTime.
func (m *Manager) SetTCTime(ctx context.Context, n *node.Node, tmtime *time.Time) error {
// decomposedfs does not differentiate between ctime and mtime
return m.SetTMTime(ctx, n, tmtime)
}
// DTime returns the deletion time (dtime) of a node.
func (m *Manager) DTime(ctx context.Context, n *node.Node) (tmTime time.Time, err error) {
b, err := n.XattrString(ctx, prefixes.DTimeAttr)
if err != nil {
return time.Time{}, err
}
return time.Parse(time.RFC3339Nano, b)
}
// SetDTime sets the deletion time (dtime) of a node to the specified time.
// If t is nil, the dtime attribute is removed.
func (m *Manager) SetDTime(ctx context.Context, n *node.Node, t *time.Time) (err error) {
if t == nil {
return n.RemoveXattr(ctx, prefixes.DTimeAttr, true)
}
return n.SetXattrString(ctx, prefixes.DTimeAttr, t.UTC().Format(time.RFC3339Nano))
}

View File

@@ -0,0 +1,10 @@
//go:build darwin || freebsd || netbsd || openbsd
package timemanager
import "syscall"
// StatCtime returns the change time
func StatCTime(st *syscall.Stat_t) syscall.Timespec {
return st.Ctimespec
}

View File

@@ -0,0 +1,10 @@
//go:build dragonfly || linux || solaris
package timemanager
import "syscall"
// StatCtime returns the change time
func StatCTime(st *syscall.Stat_t) syscall.Timespec {
return st.Ctim
}

View File

@@ -0,0 +1,307 @@
// Copyright 2018-2024 CERN
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// In applying this license, CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.
package trashbin
import (
"context"
"fmt"
"os"
"path/filepath"
"strings"
"time"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
typesv1beta1 "github.com/cs3org/go-cs3apis/cs3/types/v1beta1"
"github.com/cs3org/reva/v2/pkg/storage"
"github.com/cs3org/reva/v2/pkg/storage/fs/posix/lookup"
"github.com/cs3org/reva/v2/pkg/storage/fs/posix/options"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata/prefixes"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node"
"github.com/cs3org/reva/v2/pkg/utils"
"github.com/google/uuid"
)
type Trashbin struct {
fs storage.FS
o *options.Options
lu *lookup.Lookup
}
const (
trashHeader = `[Trash Info]`
timeFormat = "2006-01-02T15:04:05"
)
// New returns a new Trashbin
func New(o *options.Options, lu *lookup.Lookup) (*Trashbin, error) {
return &Trashbin{
o: o,
lu: lu,
}, nil
}
func (tb *Trashbin) writeInfoFile(trashPath, id, path string) error {
c := trashHeader
c += "\nPath=" + path
c += "\nDeletionDate=" + time.Now().Format(timeFormat)
return os.WriteFile(filepath.Join(trashPath, "info", id+".trashinfo"), []byte(c), 0644)
}
func (tb *Trashbin) readInfoFile(trashPath, id string) (string, *typesv1beta1.Timestamp, error) {
c, err := os.ReadFile(filepath.Join(trashPath, "info", id+".trashinfo"))
if err != nil {
return "", nil, err
}
var (
path string
ts *typesv1beta1.Timestamp
)
for _, line := range strings.Split(string(c), "\n") {
if strings.HasPrefix(line, "DeletionDate=") {
t, err := time.ParseInLocation(timeFormat, strings.TrimSpace(strings.TrimPrefix(line, "DeletionDate=")), time.Local)
if err != nil {
return "", nil, err
}
ts = utils.TimeToTS(t)
}
if strings.HasPrefix(line, "Path=") {
path = strings.TrimPrefix(line, "Path=")
}
}
return path, ts, nil
}
// Setup the trashbin
func (tb *Trashbin) Setup(fs storage.FS) error {
if tb.fs != nil {
return nil
}
tb.fs = fs
return nil
}
func trashRootForNode(n *node.Node) string {
return filepath.Join(n.SpaceRoot.InternalPath(), ".Trash")
}
func (tb *Trashbin) MoveToTrash(ctx context.Context, n *node.Node, path string) error {
key := uuid.New().String()
trashPath := trashRootForNode(n)
err := os.MkdirAll(filepath.Join(trashPath, "info"), 0755)
if err != nil {
return err
}
err = os.MkdirAll(filepath.Join(trashPath, "files"), 0755)
if err != nil {
return err
}
relPath := strings.TrimPrefix(path, n.SpaceRoot.InternalPath())
relPath = strings.TrimPrefix(relPath, "/")
err = tb.writeInfoFile(trashPath, key, relPath)
if err != nil {
return err
}
// purge metadata
if err = tb.lu.IDCache.DeleteByPath(ctx, path); err != nil {
return err
}
itemTrashPath := filepath.Join(trashPath, "files", key+".trashitem")
err = tb.lu.MetadataBackend().Rename(path, itemTrashPath)
if err != nil {
return err
}
return os.Rename(path, itemTrashPath)
}
// ListRecycle returns the list of available recycle items
// ref -> the space (= resourceid), key -> deleted node id, relativePath = relative to key
func (tb *Trashbin) ListRecycle(ctx context.Context, ref *provider.Reference, key, relativePath string) ([]*provider.RecycleItem, error) {
n, err := tb.lu.NodeFromResource(ctx, ref)
if err != nil {
return nil, err
}
trashRoot := trashRootForNode(n)
base := filepath.Join(trashRoot, "files")
var originalPath string
var ts *typesv1beta1.Timestamp
if key != "" {
// this is listing a specific item/folder
base = filepath.Join(base, key+".trashitem", relativePath)
originalPath, ts, err = tb.readInfoFile(trashRoot, key)
originalPath = filepath.Join(originalPath, relativePath)
if err != nil {
return nil, err
}
}
items := []*provider.RecycleItem{}
entries, err := os.ReadDir(filepath.Clean(base))
if err != nil {
switch err.(type) {
case *os.PathError:
return items, nil
default:
return nil, err
}
}
for _, entry := range entries {
var fi os.FileInfo
var entryOriginalPath string
var entryKey string
if strings.HasSuffix(entry.Name(), ".trashitem") {
entryKey = strings.TrimSuffix(entry.Name(), ".trashitem")
entryOriginalPath, ts, err = tb.readInfoFile(trashRoot, entryKey)
if err != nil {
continue
}
fi, err = entry.Info()
if err != nil {
continue
}
} else {
fi, err = os.Stat(filepath.Join(base, entry.Name()))
entryKey = entry.Name()
entryOriginalPath = filepath.Join(originalPath, entry.Name())
if err != nil {
continue
}
}
item := &provider.RecycleItem{
Key: filepath.Join(key, relativePath, entryKey),
Size: uint64(fi.Size()),
Ref: &provider.Reference{
ResourceId: &provider.ResourceId{
SpaceId: ref.GetResourceId().GetSpaceId(),
OpaqueId: ref.GetResourceId().GetSpaceId(),
},
Path: entryOriginalPath,
},
DeletionTime: ts,
}
if entry.IsDir() {
item.Type = provider.ResourceType_RESOURCE_TYPE_CONTAINER
} else {
item.Type = provider.ResourceType_RESOURCE_TYPE_FILE
}
items = append(items, item)
}
return items, nil
}
// RestoreRecycleItem restores the specified item
func (tb *Trashbin) RestoreRecycleItem(ctx context.Context, ref *provider.Reference, key, relativePath string, restoreRef *provider.Reference) error {
n, err := tb.lu.NodeFromResource(ctx, ref)
if err != nil {
return err
}
trashRoot := trashRootForNode(n)
trashPath := filepath.Clean(filepath.Join(trashRoot, "files", key+".trashitem", relativePath))
restoreBaseNode, err := tb.lu.NodeFromID(ctx, restoreRef.GetResourceId())
if err != nil {
return err
}
restorePath := filepath.Join(restoreBaseNode.InternalPath(), restoreRef.GetPath())
id, err := tb.lu.MetadataBackend().Get(ctx, trashPath, prefixes.IDAttr)
if err != nil {
return err
}
// update parent id in case it was restored to a different location
parentID, err := tb.lu.MetadataBackend().Get(ctx, filepath.Dir(restorePath), prefixes.IDAttr)
if err != nil {
return err
}
if len(parentID) == 0 {
return fmt.Errorf("trashbin: parent id not found for %s", restorePath)
}
err = tb.lu.MetadataBackend().Set(ctx, trashPath, prefixes.ParentidAttr, parentID)
if err != nil {
return err
}
// restore the item
err = os.Rename(trashPath, restorePath)
if err != nil {
return err
}
_ = tb.lu.CacheID(ctx, n.SpaceID, string(id), restorePath)
// cleanup trash info
if relativePath == "." || relativePath == "/" {
return os.Remove(filepath.Join(trashRoot, "info", key+".trashinfo"))
} else {
return nil
}
}
// PurgeRecycleItem purges the specified item, all its children and all their revisions
func (tb *Trashbin) PurgeRecycleItem(ctx context.Context, ref *provider.Reference, key, relativePath string) error {
n, err := tb.lu.NodeFromResource(ctx, ref)
if err != nil {
return err
}
trashRoot := trashRootForNode(n)
err = os.RemoveAll(filepath.Clean(filepath.Join(trashRoot, "files", key+".trashitem", relativePath)))
if err != nil {
return err
}
cleanPath := filepath.Clean(relativePath)
if cleanPath == "." || cleanPath == "/" {
return os.Remove(filepath.Join(trashRoot, "info", key+".trashinfo"))
}
return nil
}
// EmptyRecycle empties the trash
func (tb *Trashbin) EmptyRecycle(ctx context.Context, ref *provider.Reference) error {
n, err := tb.lu.NodeFromResource(ctx, ref)
if err != nil {
return err
}
trashRoot := trashRootForNode(n)
err = os.RemoveAll(filepath.Clean(filepath.Join(trashRoot, "files")))
if err != nil {
return err
}
return os.RemoveAll(filepath.Clean(filepath.Join(trashRoot, "info")))
}

View File

@@ -78,6 +78,11 @@ func NewScanDebouncer(d time.Duration, f func(item scanItem)) *ScanDebouncer {
// Debounce restarts the debounce timer for the given space
func (d *ScanDebouncer) Debounce(item scanItem) {
if d.after == 0 {
d.f(item)
return
}
d.mutex.Lock()
defer d.mutex.Unlock()
@@ -213,7 +218,7 @@ func (t *Tree) Scan(path string, action EventAction, isDir bool, recurse bool) e
func (t *Tree) HandleFileDelete(path string) error {
// purge metadata
_ = t.lookup.(*lookup.Lookup).IDCache.DeleteByPath(context.Background(), path)
_ = t.lookup.MetadataBackend().Purge(path)
_ = t.lookup.MetadataBackend().Purge(context.Background(), path)
// send event
owner, spaceID, nodeID, parentID, err := t.getOwnerAndIDs(filepath.Dir(path))
@@ -333,56 +338,75 @@ func (t *Tree) assimilate(item scanItem) error {
id, err = t.lookup.MetadataBackend().Get(context.Background(), item.Path, prefixes.IDAttr)
if err == nil {
previousPath, ok := t.lookup.(*lookup.Lookup).GetCachedID(context.Background(), spaceID, string(id))
// This item had already been assimilated in the past. Update the path
_ = t.lookup.(*lookup.Lookup).CacheID(context.Background(), spaceID, string(id), item.Path)
previousParentID, _ := t.lookup.MetadataBackend().Get(context.Background(), item.Path, prefixes.ParentidAttr)
fi, err := t.updateFile(item.Path, string(id), spaceID)
if err != nil {
return err
}
// was it moved?
// was it moved or copied/restored with a clashing id?
if ok && len(previousParentID) > 0 && previousPath != item.Path {
// purge original metadata. Only delete the path entry using DeletePath(reverse lookup), not the whole entry pair.
_ = t.lookup.(*lookup.Lookup).IDCache.DeletePath(context.Background(), previousPath)
_ = t.lookup.MetadataBackend().Purge(previousPath)
_, err := os.Stat(previousPath)
if err == nil {
// this id clashes with an existing id -> clear metadata and re-assimilate
if fi.IsDir() {
// if it was moved and it is a directory we need to propagate the move
go func() { _ = t.WarmupIDCache(item.Path, false) }()
}
_ = t.lookup.MetadataBackend().Purge(context.Background(), item.Path)
go func() {
_ = t.assimilate(scanItem{Path: item.Path, ForceRescan: true})
}()
} else {
// this is a move
_ = t.lookup.(*lookup.Lookup).CacheID(context.Background(), spaceID, string(id), item.Path)
_, err := t.updateFile(item.Path, string(id), spaceID)
if err != nil {
return err
}
parentID, err := t.lookup.MetadataBackend().Get(context.Background(), item.Path, prefixes.ParentidAttr)
if err == nil && len(parentID) > 0 {
ref := &provider.Reference{
ResourceId: &provider.ResourceId{
StorageId: t.options.MountID,
SpaceId: spaceID,
OpaqueId: string(parentID),
},
Path: filepath.Base(item.Path),
// purge original metadata. Only delete the path entry using DeletePath(reverse lookup), not the whole entry pair.
_ = t.lookup.(*lookup.Lookup).IDCache.DeletePath(context.Background(), previousPath)
_ = t.lookup.MetadataBackend().Purge(context.Background(), previousPath)
fi, err := os.Stat(item.Path)
if err != nil {
return err
}
oldRef := &provider.Reference{
ResourceId: &provider.ResourceId{
StorageId: t.options.MountID,
SpaceId: spaceID,
OpaqueId: string(previousParentID),
},
Path: filepath.Base(previousPath),
if fi.IsDir() {
// if it was moved and it is a directory we need to propagate the move
go func() { _ = t.WarmupIDCache(item.Path, false) }()
}
parentID, err := t.lookup.MetadataBackend().Get(context.Background(), item.Path, prefixes.ParentidAttr)
if err == nil && len(parentID) > 0 {
ref := &provider.Reference{
ResourceId: &provider.ResourceId{
StorageId: t.options.MountID,
SpaceId: spaceID,
OpaqueId: string(parentID),
},
Path: filepath.Base(item.Path),
}
oldRef := &provider.Reference{
ResourceId: &provider.ResourceId{
StorageId: t.options.MountID,
SpaceId: spaceID,
OpaqueId: string(previousParentID),
},
Path: filepath.Base(previousPath),
}
t.PublishEvent(events.ItemMoved{
SpaceOwner: user,
Executant: user,
Owner: user,
Ref: ref,
OldReference: oldRef,
Timestamp: utils.TSNow(),
})
}
t.PublishEvent(events.ItemMoved{
SpaceOwner: user,
Executant: user,
Owner: user,
Ref: ref,
OldReference: oldRef,
Timestamp: utils.TSNow(),
})
}
// }
} else {
// This item had already been assimilated in the past. Update the path
_ = t.lookup.(*lookup.Lookup).CacheID(context.Background(), spaceID, string(id), item.Path)
_, err := t.updateFile(item.Path, string(id), spaceID)
if err != nil {
return err
}
}
} else {
// assimilate new file
@@ -472,10 +496,6 @@ assimilate:
prefixes.IDAttr: []byte(id),
prefixes.NameAttr: []byte(filepath.Base(path)),
}
prevMtime, err := previousAttribs.Time(prefixes.MTimeAttr)
if err != nil || prevMtime.Before(fi.ModTime()) {
attributes[prefixes.MTimeAttr] = []byte(fi.ModTime().Format(time.RFC3339Nano))
}
if len(parentID) > 0 {
attributes[prefixes.ParentidAttr] = []byte(parentID)
}
@@ -496,25 +516,15 @@ assimilate:
attributes[prefixes.PropagationAttr] = []byte("1")
} else {
attributes.SetInt64(prefixes.TypeAttr, int64(provider.ResourceType_RESOURCE_TYPE_FILE))
attributes.SetString(prefixes.BlobIDAttr, id)
attributes.SetInt64(prefixes.BlobsizeAttr, fi.Size())
// propagate the change
sizeDiff := fi.Size()
if previousAttribs != nil && previousAttribs[prefixes.BlobsizeAttr] != nil {
oldSize, err := attributes.Int64(prefixes.BlobsizeAttr)
if err == nil {
sizeDiff -= oldSize
}
}
n := node.New(spaceID, id, parentID, filepath.Base(path), fi.Size(), "", provider.ResourceType_RESOURCE_TYPE_FILE, nil, t.lookup)
n.SpaceRoot = &node.Node{SpaceID: spaceID, ID: spaceID}
err = t.Propagate(context.Background(), n, sizeDiff)
if err != nil {
return nil, errors.Wrap(err, "failed to propagate")
}
}
n := node.New(spaceID, id, parentID, filepath.Base(path), fi.Size(), "", provider.ResourceType_RESOURCE_TYPE_FILE, nil, t.lookup)
n.SpaceRoot = &node.Node{SpaceID: spaceID, ID: spaceID}
err = t.Propagate(context.Background(), n, 0)
if err != nil {
return nil, errors.Wrap(err, "failed to propagate")
}
err = t.lookup.MetadataBackend().SetMultiple(context.Background(), path, attributes, false)
if err != nil {
return nil, errors.Wrap(err, "failed to set attributes")
@@ -547,15 +557,15 @@ func (t *Tree) WarmupIDCache(root string, assimilate bool) error {
sizes := make(map[string]int64)
err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
// skip lock files
if isLockFile(path) {
return nil
}
if err != nil {
return err
}
// calculate tree sizes
if !info.IsDir() {
dir := filepath.Dir(path)
@@ -593,6 +603,16 @@ func (t *Tree) WarmupIDCache(root string, assimilate bool) error {
id, ok := attribs[prefixes.IDAttr]
if ok {
// Check if the item on the previous still exists. In this case it might have been a copy with extended attributes -> set new ID
previousPath, ok := t.lookup.(*lookup.Lookup).GetCachedID(context.Background(), string(spaceID), string(id))
if ok && previousPath != path {
// this id clashes with an existing id -> clear metadata and re-assimilate
_, err := os.Stat(previousPath)
if err == nil {
_ = t.lookup.MetadataBackend().Purge(context.Background(), path)
_ = t.assimilate(scanItem{Path: path, ForceRescan: true})
}
}
_ = t.lookup.(*lookup.Lookup).CacheID(context.Background(), string(spaceID), string(id), path)
}
} else if assimilate {
@@ -600,12 +620,14 @@ func (t *Tree) WarmupIDCache(root string, assimilate bool) error {
}
return nil
})
if err != nil {
return err
}
for dir, size := range sizes {
_ = t.lookup.MetadataBackend().Set(context.Background(), dir, prefixes.TreesizeAttr, []byte(fmt.Sprintf("%d", size)))
}
if err != nil {
return err
}
return nil
}

View File

@@ -59,6 +59,9 @@ start:
if err != nil {
continue
}
if isLockFile(ev.Path) || isTrash(ev.Path) {
continue
}
switch ev.Event {
case "CREATE":
go func() { _ = w.tree.Scan(ev.Path, ActionCreate, false, false) }()

View File

@@ -41,7 +41,7 @@ func (w *GpfsWatchFolderWatcher) Watch(topic string) {
continue
}
if isLockFile(lwev.Path) {
if isLockFile(lwev.Path) || isTrash(lwev.Path) {
continue
}

View File

@@ -42,7 +42,7 @@ func (iw *InotifyWatcher) Watch(path string) {
select {
case event := <-events:
for _, e := range event.Events {
if isLockFile(event.Filename) {
if isLockFile(event.Filename) || isTrash(event.Filename) {
continue
}
switch e {

View File

@@ -19,7 +19,6 @@
package tree
import (
"bytes"
"context"
"fmt"
"io"
@@ -28,7 +27,6 @@ import (
"path/filepath"
"regexp"
"strings"
"time"
"github.com/google/uuid"
"github.com/pkg/errors"
@@ -46,6 +44,7 @@ import (
"github.com/cs3org/reva/v2/pkg/logger"
"github.com/cs3org/reva/v2/pkg/storage/fs/posix/lookup"
"github.com/cs3org/reva/v2/pkg/storage/fs/posix/options"
"github.com/cs3org/reva/v2/pkg/storage/fs/posix/trashbin"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata/prefixes"
@@ -82,6 +81,7 @@ type scanItem struct {
type Tree struct {
lookup node.PathLookup
blobstore Blobstore
trashbin *trashbin.Trashbin
propagator propagator.Propagator
options *options.Options
@@ -100,18 +100,19 @@ type Tree struct {
type PermissionCheckFunc func(rp *provider.ResourcePermissions) bool
// New returns a new instance of Tree
func New(lu node.PathLookup, bs Blobstore, um usermapper.Mapper, o *options.Options, es events.Stream, cache store.Store) (*Tree, error) {
func New(lu node.PathLookup, bs Blobstore, um usermapper.Mapper, trashbin *trashbin.Trashbin, o *options.Options, es events.Stream, cache store.Store) (*Tree, error) {
log := logger.New()
scanQueue := make(chan scanItem)
t := &Tree{
lookup: lu,
blobstore: bs,
userMapper: um,
trashbin: trashbin,
options: o,
idCache: cache,
propagator: propagator.New(lu, &o.Options),
scanQueue: scanQueue,
scanDebouncer: NewScanDebouncer(1000*time.Millisecond, func(item scanItem) {
scanDebouncer: NewScanDebouncer(o.ScanDebounceDelay, func(item scanItem) {
scanQueue <- item
}),
es: es,
@@ -229,14 +230,17 @@ func (t *Tree) TouchFile(ctx context.Context, n *node.Node, markprocessing bool,
if markprocessing {
attributes[prefixes.StatusPrefix] = []byte(node.ProcessingStatus)
}
nodeMTime := time.Now()
if mtime != "" {
nodeMTime, err = utils.MTimeToTime(mtime)
nodeMTime, err := utils.MTimeToTime(mtime)
if err != nil {
return err
}
err = os.Chtimes(nodePath, nodeMTime, nodeMTime)
if err != nil {
return err
}
}
attributes[prefixes.MTimeAttr] = []byte(nodeMTime.UTC().Format(time.RFC3339Nano))
err = n.SetXattrsWithContext(ctx, attributes, false)
if err != nil {
return err
@@ -297,18 +301,6 @@ func (t *Tree) Move(ctx context.Context, oldNode *node.Node, newNode *node.Node)
return errors.Wrap(err, "Decomposedfs: could not update old node attributes")
}
// the size diff is the current treesize or blobsize of the old/source node
var sizeDiff int64
if oldNode.IsDir(ctx) {
treeSize, err := oldNode.GetTreeSize(ctx)
if err != nil {
return err
}
sizeDiff = int64(treeSize)
} else {
sizeDiff = oldNode.Blobsize
}
// rename node
err = os.Rename(
filepath.Join(oldNode.ParentPath(), oldNode.Name),
@@ -334,7 +326,7 @@ func (t *Tree) Move(ctx context.Context, oldNode *node.Node, newNode *node.Node)
newNode.ID = oldNode.ID
}
_ = t.lookup.(*lookup.Lookup).CacheID(ctx, newNode.SpaceID, newNode.ID, filepath.Join(newNode.ParentPath(), newNode.Name))
// update id cache for the moved subtree
// update id cache for the moved subtree.
if oldNode.IsDir(ctx) {
err = t.WarmupIDCache(filepath.Join(newNode.ParentPath(), newNode.Name), false)
if err != nil {
@@ -342,15 +334,11 @@ func (t *Tree) Move(ctx context.Context, oldNode *node.Node, newNode *node.Node)
}
}
// TODO inefficient because we might update several nodes twice, only propagate unchanged nodes?
// collect in a list, then only stat each node once
// also do this in a go routine ... webdav should check the etag async
err = t.Propagate(ctx, oldNode, -sizeDiff)
err = t.Propagate(ctx, oldNode, 0)
if err != nil {
return errors.Wrap(err, "Decomposedfs: Move: could not propagate old node")
}
err = t.Propagate(ctx, newNode, sizeDiff)
err = t.Propagate(ctx, newNode, 0)
if err != nil {
return errors.Wrap(err, "Decomposedfs: Move: could not propagate new node")
}
@@ -394,7 +382,7 @@ func (t *Tree) ListFolder(ctx context.Context, n *node.Node) ([]*node.Node, erro
g.Go(func() error {
defer close(work)
for _, name := range names {
if isLockFile(name) {
if isLockFile(name) || isTrash(name) {
continue
}
@@ -469,7 +457,7 @@ func (t *Tree) ListFolder(ctx context.Context, n *node.Node) ([]*node.Node, erro
}
// Delete deletes a node in the tree by moving it to the trash
func (t *Tree) Delete(ctx context.Context, n *node.Node) (err error) {
func (t *Tree) Delete(ctx context.Context, n *node.Node) error {
path := n.InternalPath()
if !strings.HasPrefix(path, t.options.Root) {
@@ -500,28 +488,11 @@ func (t *Tree) Delete(ctx context.Context, n *node.Node) (err error) {
// Remove lock file if it exists
_ = os.Remove(n.LockFilePath())
// purge metadata
err = filepath.WalkDir(path, func(path string, _ fs.DirEntry, err error) error {
if err != nil {
return err
}
if err = t.lookup.(*lookup.Lookup).IDCache.DeleteByPath(ctx, path); err != nil {
return err
}
if err = t.lookup.MetadataBackend().Purge(path); err != nil {
return err
}
return nil
})
err := t.trashbin.MoveToTrash(ctx, n, path)
if err != nil {
return err
}
if err = os.RemoveAll(path); err != nil {
return
}
return t.Propagate(ctx, n, sizeDiff)
}
@@ -659,7 +630,7 @@ func (t *Tree) removeNode(ctx context.Context, path string, n *node.Node) error
return err
}
if err := t.lookup.MetadataBackend().Purge(path); err != nil {
if err := t.lookup.MetadataBackend().Purge(ctx, path); err != nil {
log.Error().Err(err).Str("path", t.lookup.MetadataBackend().MetadataPath(path)).Msg("error purging node metadata")
return err
}
@@ -673,42 +644,15 @@ func (t *Tree) removeNode(ctx context.Context, path string, n *node.Node) error
}
// delete revisions
revs, err := filepath.Glob(n.InternalPath() + node.RevisionIDDelimiter + "*")
if err != nil {
log.Error().Err(err).Str("path", n.InternalPath()+node.RevisionIDDelimiter+"*").Msg("glob failed badly")
return err
}
for _, rev := range revs {
if t.lookup.MetadataBackend().IsMetaFile(rev) {
continue
}
bID, err := t.lookup.ReadBlobIDAttr(ctx, rev)
if err != nil {
log.Error().Err(err).Str("revision", rev).Msg("error reading blobid attribute")
return err
}
if err := utils.RemoveItem(rev); err != nil {
log.Error().Err(err).Str("revision", rev).Msg("error removing revision node")
return err
}
if bID != "" {
if err := t.DeleteBlob(&node.Node{SpaceID: n.SpaceID, BlobID: bID}); err != nil {
log.Error().Err(err).Str("revision", rev).Str("blobID", bID).Msg("error removing revision node blob")
return err
}
}
}
// posixfs doesn't do revisions yet
return nil
}
// Propagate propagates changes to the root of the tree
func (t *Tree) Propagate(ctx context.Context, n *node.Node, sizeDiff int64) (err error) {
return t.propagator.Propagate(ctx, n, sizeDiff)
func (t *Tree) Propagate(ctx context.Context, n *node.Node, _ int64) (err error) {
// We do not propagate size diffs here but rely on the assimilation to take care of the tree sizes instead
return t.propagator.Propagate(ctx, n, 0)
}
// WriteBlob writes a blob to the blobstore
@@ -718,10 +662,6 @@ func (t *Tree) WriteBlob(node *node.Node, source string) error {
// ReadBlob reads a blob from the blobstore
func (t *Tree) ReadBlob(node *node.Node) (io.ReadCloser, error) {
if node.BlobID == "" {
// there is no blob yet - we are dealing with a 0 byte file
return io.NopCloser(bytes.NewReader([]byte{})), nil
}
return t.blobstore.Download(node)
}
@@ -730,10 +670,6 @@ func (t *Tree) DeleteBlob(node *node.Node) error {
if node == nil {
return fmt.Errorf("could not delete blob, nil node was given")
}
if node.BlobID == "" {
return fmt.Errorf("could not delete blob, node with empty blob id was given")
}
return t.blobstore.Delete(node)
}
@@ -886,3 +822,7 @@ func (t *Tree) readRecycleItem(ctx context.Context, spaceID, key, path string) (
func isLockFile(path string) bool {
return strings.HasSuffix(path, ".lock") || strings.HasSuffix(path, ".flock") || strings.HasSuffix(path, ".mlock")
}
func isTrash(path string) bool {
return strings.HasSuffix(path, ".trashinfo") || strings.HasSuffix(path, ".trashitem")
}

View File

@@ -22,6 +22,7 @@ import (
"github.com/cs3org/reva/v2/pkg/events"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/permissions"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/trashbin"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/usermapper"
)
@@ -29,6 +30,7 @@ import (
type Aspects struct {
Lookup node.PathLookup
Tree node.Tree
Trashbin trashbin.Trashbin
Permissions permissions.Permissions
EventStream events.Stream
DisableVersioning bool

View File

@@ -49,6 +49,8 @@ import (
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/options"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/permissions"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/spaceidindex"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/timemanager"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/trashbin"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/tree"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/upload"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/usermapper"
@@ -110,6 +112,7 @@ type SessionStore interface {
type Decomposedfs struct {
lu node.PathLookup
tp node.Tree
trashbin trashbin.Trashbin
o *options.Options
p permissions.Permissions
um usermapper.Mapper
@@ -133,9 +136,9 @@ func NewDefault(m map[string]interface{}, bs tree.Blobstore, es events.Stream) (
var lu *lookup.Lookup
switch o.MetadataBackend {
case "xattrs":
lu = lookup.New(metadata.NewXattrsBackend(o.Root, o.FileMetadataCache), o)
lu = lookup.New(metadata.NewXattrsBackend(o.Root, o.FileMetadataCache), o, &timemanager.Manager{})
case "messagepack":
lu = lookup.New(metadata.NewMessagePackBackend(o.Root, o.FileMetadataCache), o)
lu = lookup.New(metadata.NewMessagePackBackend(o.Root, o.FileMetadataCache), o, &timemanager.Manager{})
default:
return nil, fmt.Errorf("unknown metadata backend %s, only 'messagepack' or 'xattrs' (default) supported", o.MetadataBackend)
}
@@ -162,6 +165,7 @@ func NewDefault(m map[string]interface{}, bs tree.Blobstore, es events.Stream) (
Permissions: permissions.NewPermissions(node.NewPermissions(lu), permissionsSelector),
EventStream: es,
DisableVersioning: o.DisableVersioning,
Trashbin: &DecomposedfsTrashbin{},
}
return New(o, aspects)
@@ -209,6 +213,9 @@ func New(o *options.Options, aspects aspects.Aspects) (storage.FS, error) {
return nil, err
}
if aspects.Trashbin == nil {
return nil, errors.New("need trashbin")
}
// set a null usermapper if we don't have one
if aspects.UserMapper == nil {
aspects.UserMapper = &usermapper.NullMapper{}
@@ -217,6 +224,7 @@ func New(o *options.Options, aspects aspects.Aspects) (storage.FS, error) {
fs := &Decomposedfs{
tp: aspects.Tree,
lu: aspects.Lookup,
trashbin: aspects.Trashbin,
o: o,
p: aspects.Permissions,
um: aspects.UserMapper,
@@ -228,6 +236,9 @@ func New(o *options.Options, aspects aspects.Aspects) (storage.FS, error) {
spaceTypeIndex: spaceTypeIndex,
}
fs.sessionStore = upload.NewSessionStore(fs, aspects, o.Root, o.AsyncFileUploads, o.Tokens)
if err = fs.trashbin.Setup(fs); err != nil {
return nil, err
}
if o.AsyncFileUploads {
if fs.stream == nil {
@@ -347,6 +358,14 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) {
fs.sessionStore.Cleanup(ctx, session, revertNodeMetadata, keepUpload, unmarkPostprocessing)
var isVersion bool
if session.NodeExists() {
info, err := session.GetInfo(ctx)
if err == nil && info.MetaData["versionsPath"] != "" {
isVersion = true
}
}
if err := events.Publish(
ctx,
fs.stream,
@@ -365,6 +384,7 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) {
},
Timestamp: utils.TimeToTS(now),
SpaceOwner: n.SpaceOwnerOrManager(ctx),
IsVersion: isVersion,
},
); err != nil {
sublog.Error().Err(err).Msg("Failed to publish UploadReady event")
@@ -1201,3 +1221,16 @@ func (fs *Decomposedfs) Unlock(ctx context.Context, ref *provider.Reference, loc
return node.Unlock(ctx, lock)
}
func (fs *Decomposedfs) ListRecycle(ctx context.Context, ref *provider.Reference, key, relativePath string) ([]*provider.RecycleItem, error) {
return fs.trashbin.ListRecycle(ctx, ref, key, relativePath)
}
func (fs *Decomposedfs) RestoreRecycleItem(ctx context.Context, ref *provider.Reference, key, relativePath string, restoreRef *provider.Reference) error {
return fs.trashbin.RestoreRecycleItem(ctx, ref, key, relativePath, restoreRef)
}
func (fs *Decomposedfs) PurgeRecycleItem(ctx context.Context, ref *provider.Reference, key, relativePath string) error {
return fs.trashbin.PurgeRecycleItem(ctx, ref, key, relativePath)
}
func (fs *Decomposedfs) EmptyRecycle(ctx context.Context, ref *provider.Reference) error {
return fs.trashbin.EmptyRecycle(ctx, ref)
}

View File

@@ -55,13 +55,15 @@ type Lookup struct {
Options *options.Options
metadataBackend metadata.Backend
tm node.TimeManager
}
// New returns a new Lookup instance
func New(b metadata.Backend, o *options.Options) *Lookup {
func New(b metadata.Backend, o *options.Options, tm node.TimeManager) *Lookup {
return &Lookup{
Options: o,
metadataBackend: b,
tm: tm,
}
}
@@ -70,23 +72,34 @@ func (lu *Lookup) MetadataBackend() metadata.Backend {
return lu.metadataBackend
}
// ReadBlobSizeAttr reads the blobsize from the xattrs
func (lu *Lookup) ReadBlobSizeAttr(ctx context.Context, path string) (int64, error) {
blobSize, err := lu.metadataBackend.GetInt64(ctx, path, prefixes.BlobsizeAttr)
if err != nil {
return 0, errors.Wrapf(err, "error reading blobsize xattr")
func (lu *Lookup) ReadBlobIDAndSizeAttr(ctx context.Context, path string, attrs node.Attributes) (string, int64, error) {
blobID := ""
blobSize := int64(0)
var err error
if attrs != nil {
blobID = attrs.String(prefixes.BlobIDAttr)
if blobID != "" {
blobSize, err = attrs.Int64(prefixes.BlobsizeAttr)
if err != nil {
return "", 0, err
}
}
} else {
attrs, err := lu.metadataBackend.All(ctx, path)
if err != nil {
return "", 0, errors.Wrapf(err, "error reading blobid xattr")
}
nodeAttrs := node.Attributes(attrs)
blobID = nodeAttrs.String(prefixes.BlobIDAttr)
blobSize, err = nodeAttrs.Int64(prefixes.BlobsizeAttr)
if err != nil {
return "", 0, errors.Wrapf(err, "error reading blobsize xattr")
}
}
return blobSize, nil
return blobID, blobSize, nil
}
// ReadBlobIDAttr reads the blobsize from the xattrs
func (lu *Lookup) ReadBlobIDAttr(ctx context.Context, path string) (string, error) {
attr, err := lu.metadataBackend.Get(ctx, path, prefixes.BlobIDAttr)
if err != nil {
return "", errors.Wrapf(err, "error reading blobid xattr")
}
return string(attr), nil
}
func readChildNodeFromLink(path string) (string, error) {
link, err := os.Readlink(path)
if err != nil {
@@ -369,6 +382,11 @@ func (lu *Lookup) CopyMetadataWithSourceLock(ctx context.Context, sourcePath, ta
return lu.MetadataBackend().SetMultiple(ctx, targetPath, newAttrs, acquireTargetLock)
}
// TimeManager returns the time manager
func (lu *Lookup) TimeManager() node.TimeManager {
return lu.tm
}
// DetectBackendOnDisk returns the name of the metadata backend being used on disk
func DetectBackendOnDisk(root string) string {
matches, _ := filepath.Glob(filepath.Join(root, "spaces", "*", "*"))

View File

@@ -273,7 +273,7 @@ func (MessagePackBackend) IsMetaFile(path string) bool {
}
// Purge purges the data of a given path
func (b MessagePackBackend) Purge(path string) error {
func (b MessagePackBackend) Purge(_ context.Context, path string) error {
if err := b.metaCache.RemoveMetadata(b.cacheKey(path)); err != nil {
return err
}

View File

@@ -51,7 +51,7 @@ type Backend interface {
Remove(ctx context.Context, path, key string, acquireLock bool) error
Lock(path string) (UnlockFunc, error)
Purge(path string) error
Purge(ctx context.Context, path string) error
Rename(oldPath, newPath string) error
IsMetaFile(path string) bool
MetadataPath(path string) string
@@ -111,7 +111,7 @@ func (NullBackend) Lock(path string) (UnlockFunc, error) {
func (NullBackend) IsMetaFile(path string) bool { return false }
// Purge purges the data of a given path from any cache that might hold it
func (NullBackend) Purge(purges string) error { return errUnconfiguredError }
func (NullBackend) Purge(_ context.Context, purges string) error { return errUnconfiguredError }
// Rename moves the data for a given path to a new path
func (NullBackend) Rename(oldPath, newPath string) error { return errUnconfiguredError }

View File

@@ -27,6 +27,7 @@ import (
"strings"
"github.com/cs3org/reva/v2/pkg/storage/cache"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata/prefixes"
"github.com/cs3org/reva/v2/pkg/storage/utils/filelocks"
"github.com/pkg/errors"
"github.com/pkg/xattr"
@@ -213,7 +214,24 @@ func (b XattrsBackend) Remove(ctx context.Context, path string, key string, acqu
func (XattrsBackend) IsMetaFile(path string) bool { return strings.HasSuffix(path, ".meta.lock") }
// Purge purges the data of a given path
func (b XattrsBackend) Purge(path string) error {
func (b XattrsBackend) Purge(ctx context.Context, path string) error {
_, err := os.Stat(path)
if err == nil {
attribs, err := b.getAll(ctx, path, true)
if err != nil {
return err
}
for attr := range attribs {
if strings.HasPrefix(attr, prefixes.OcisPrefix) {
err := xattr.Remove(path, attr)
if err != nil {
return err
}
}
}
}
return b.metaCache.RemoveMetadata(b.cacheKey(path))
}

View File

@@ -23,6 +23,7 @@ import (
"crypto/md5"
"crypto/sha1"
"encoding/hex"
"encoding/json"
"fmt"
"hash"
"hash/adler32"
@@ -84,6 +85,29 @@ const (
ProcessingStatus = "processing:"
)
type TimeManager interface {
// OverrideMTime overrides the mtime of the node, either on the node itself or in the given attributes, depending on the implementation
OverrideMtime(ctx context.Context, n *Node, attrs *Attributes, mtime time.Time) error
// MTime returns the mtime of the node
MTime(ctx context.Context, n *Node) (time.Time, error)
// SetMTime sets the mtime of the node
SetMTime(ctx context.Context, n *Node, mtime *time.Time) error
// TMTime returns the tmtime of the node
TMTime(ctx context.Context, n *Node) (time.Time, error)
// SetTMTime sets the tmtime of the node
SetTMTime(ctx context.Context, n *Node, tmtime *time.Time) error
// CTime returns the ctime of the node
CTime(ctx context.Context, n *Node) (time.Time, error)
// DTime returns the deletion time of the node
DTime(ctx context.Context, n *Node) (time.Time, error)
// SetDTime sets the deletion time of the node
SetDTime(ctx context.Context, n *Node, mtime *time.Time) error
}
// Tree is used to manage a tree hierarchy
type Tree interface {
Setup() error
@@ -125,8 +149,8 @@ type PathLookup interface {
InternalPath(spaceID, nodeID string) string
Path(ctx context.Context, n *Node, hasPermission PermissionFunc) (path string, err error)
MetadataBackend() metadata.Backend
ReadBlobSizeAttr(ctx context.Context, path string) (int64, error)
ReadBlobIDAttr(ctx context.Context, path string) (string, error)
TimeManager() TimeManager
ReadBlobIDAndSizeAttr(ctx context.Context, path string, attrs Attributes) (string, int64, error)
TypeFromPath(ctx context.Context, path string) provider.ResourceType
CopyMetadataWithSourceLock(ctx context.Context, sourcePath, targetPath string, filter func(attributeName string, value []byte) (newValue []byte, copy bool), lockedSource *lockedfile.File, acquireTargetLock bool) (err error)
CopyMetadata(ctx context.Context, src, target string, filter func(attributeName string, value []byte) (newValue []byte, copy bool), acquireTargetLock bool) (err error)
@@ -172,6 +196,26 @@ func New(spaceID, id, parentID, name string, blobsize int64, blobID string, t pr
}
}
func (n *Node) MarshalJSON() ([]byte, error) {
return json.Marshal(&struct {
Name string `json:"name"`
ID string `json:"id"`
SpaceID string `json:"spaceID"`
ParentID string `json:"parentID"`
BlobID string `json:"blobID"`
BlobSize int64 `json:"blobSize"`
Exists bool `json:"exists"`
}{
Name: n.Name,
ID: n.ID,
SpaceID: n.SpaceID,
ParentID: n.ParentID,
BlobID: n.BlobID,
BlobSize: n.Blobsize,
Exists: n.Exists,
})
}
// Type returns the node's resource type
func (n *Node) Type(ctx context.Context) provider.ResourceType {
if n.nodeType != nil {
@@ -351,22 +395,12 @@ func ReadNode(ctx context.Context, lu PathLookup, spaceID, nodeID string, canLis
}
if revisionSuffix == "" {
n.BlobID = attrs.String(prefixes.BlobIDAttr)
if n.BlobID != "" {
blobSize, err := attrs.Int64(prefixes.BlobsizeAttr)
if err != nil {
return nil, err
}
n.Blobsize = blobSize
}
} else {
n.BlobID, err = lu.ReadBlobIDAttr(ctx, nodePath+revisionSuffix)
n.BlobID, n.Blobsize, err = lu.ReadBlobIDAndSizeAttr(ctx, nodePath, attrs)
if err != nil {
return nil, err
}
// Lookup blobsize
n.Blobsize, err = lu.ReadBlobSizeAttr(ctx, nodePath+revisionSuffix)
} else {
n.BlobID, n.Blobsize, err = lu.ReadBlobIDAndSizeAttr(ctx, nodePath+revisionSuffix, nil)
if err != nil {
return nil, err
}
@@ -899,55 +933,6 @@ func (n *Node) HasPropagation(ctx context.Context) (propagation bool) {
return false
}
// GetTMTime reads the tmtime from the extended attributes, falling back to GetMTime()
func (n *Node) GetTMTime(ctx context.Context) (time.Time, error) {
b, err := n.XattrString(ctx, prefixes.TreeMTimeAttr)
if err == nil {
return time.Parse(time.RFC3339Nano, b)
}
// no tmtime, use mtime
return n.GetMTime(ctx)
}
// GetMTime reads the mtime from the extended attributes, falling back to disk
func (n *Node) GetMTime(ctx context.Context) (time.Time, error) {
b, err := n.XattrString(ctx, prefixes.MTimeAttr)
if err != nil {
fi, err := os.Lstat(n.InternalPath())
if err != nil {
return time.Time{}, err
}
return fi.ModTime(), nil
}
return time.Parse(time.RFC3339Nano, b)
}
// SetTMTime writes the UTC tmtime to the extended attributes or removes the attribute if nil is passed
func (n *Node) SetTMTime(ctx context.Context, t *time.Time) (err error) {
if t == nil {
return n.RemoveXattr(ctx, prefixes.TreeMTimeAttr, true)
}
return n.SetXattrString(ctx, prefixes.TreeMTimeAttr, t.UTC().Format(time.RFC3339Nano))
}
// GetDTime reads the dtime from the extended attributes
func (n *Node) GetDTime(ctx context.Context) (tmTime time.Time, err error) {
b, err := n.XattrString(ctx, prefixes.DTimeAttr)
if err != nil {
return time.Time{}, err
}
return time.Parse(time.RFC3339Nano, b)
}
// SetDTime writes the UTC dtime to the extended attributes or removes the attribute if nil is passed
func (n *Node) SetDTime(ctx context.Context, t *time.Time) (err error) {
if t == nil {
return n.RemoveXattr(ctx, prefixes.DTimeAttr, true)
}
return n.SetXattrString(ctx, prefixes.DTimeAttr, t.UTC().Format(time.RFC3339Nano))
}
// IsDisabled returns true when the node has a dmtime attribute set
// only used to check if a space is disabled
// FIXME confusing with the trash logic
@@ -1378,3 +1363,28 @@ func CalculateChecksums(ctx context.Context, path string) (hash.Hash, hash.Hash,
return sha1h, md5h, adler32h, nil
}
// GetMTime reads the mtime from the extended attributes
func (n *Node) GetMTime(ctx context.Context) (time.Time, error) {
return n.lu.TimeManager().MTime(ctx, n)
}
// GetTMTime reads the tmtime from the extended attributes
func (n *Node) GetTMTime(ctx context.Context) (time.Time, error) {
return n.lu.TimeManager().TMTime(ctx, n)
}
// SetTMTime writes the UTC tmtime to the extended attributes or removes the attribute if nil is passed
func (n *Node) SetTMTime(ctx context.Context, t *time.Time) (err error) {
return n.lu.TimeManager().SetTMTime(ctx, n, t)
}
// GetDTime reads the dmtime from the extended attributes
func (n *Node) GetDTime(ctx context.Context) (time.Time, error) {
return n.lu.TimeManager().DTime(ctx, n)
}
// SetDTime writes the UTC dmtime to the extended attributes or removes the attribute if nil is passed
func (n *Node) SetDTime(ctx context.Context, t *time.Time) (err error) {
return n.lu.TimeManager().SetDTime(ctx, n, t)
}

View File

@@ -33,12 +33,26 @@ import (
types "github.com/cs3org/go-cs3apis/cs3/types/v1beta1"
"github.com/cs3org/reva/v2/pkg/appctx"
"github.com/cs3org/reva/v2/pkg/errtypes"
"github.com/cs3org/reva/v2/pkg/storage"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/lookup"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata/prefixes"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node"
"github.com/cs3org/reva/v2/pkg/storagespace"
)
type DecomposedfsTrashbin struct {
fs *Decomposedfs
}
// Setup the trashbin
func (tb *DecomposedfsTrashbin) Setup(fs storage.FS) error {
if _, ok := fs.(*Decomposedfs); !ok {
return errors.New("invalid filesystem")
}
tb.fs = fs.(*Decomposedfs)
return nil
}
// Recycle items are stored inside the node folder and start with the uuid of the deleted node.
// The `.T.` indicates it is a trash item and what follows is the timestamp of the deletion.
// The deleted file is kept in the same location/dir as the original node. This prevents deletes
@@ -49,7 +63,7 @@ import (
// ListRecycle returns the list of available recycle items
// ref -> the space (= resourceid), key -> deleted node id, relativePath = relative to key
func (fs *Decomposedfs) ListRecycle(ctx context.Context, ref *provider.Reference, key, relativePath string) ([]*provider.RecycleItem, error) {
func (tb *DecomposedfsTrashbin) ListRecycle(ctx context.Context, ref *provider.Reference, key, relativePath string) ([]*provider.RecycleItem, error) {
if ref == nil || ref.ResourceId == nil || ref.ResourceId.OpaqueId == "" {
return nil, errtypes.BadRequest("spaceid required")
@@ -62,11 +76,11 @@ func (fs *Decomposedfs) ListRecycle(ctx context.Context, ref *provider.Reference
sublog := appctx.GetLogger(ctx).With().Str("spaceid", spaceID).Str("key", key).Str("relative_path", relativePath).Logger()
// check permissions
trashnode, err := fs.lu.NodeFromSpaceID(ctx, spaceID)
trashnode, err := tb.fs.lu.NodeFromSpaceID(ctx, spaceID)
if err != nil {
return nil, err
}
rp, err := fs.p.AssembleTrashPermissions(ctx, trashnode)
rp, err := tb.fs.p.AssembleTrashPermissions(ctx, trashnode)
switch {
case err != nil:
return nil, err
@@ -78,13 +92,13 @@ func (fs *Decomposedfs) ListRecycle(ctx context.Context, ref *provider.Reference
}
if key == "" && relativePath == "" {
return fs.listTrashRoot(ctx, spaceID)
return tb.listTrashRoot(ctx, spaceID)
}
// build a list of trash items relative to the given trash root and path
items := make([]*provider.RecycleItem, 0)
trashRootPath := filepath.Join(fs.getRecycleRoot(spaceID), lookup.Pathify(key, 4, 2))
trashRootPath := filepath.Join(tb.getRecycleRoot(spaceID), lookup.Pathify(key, 4, 2))
originalPath, _, timeSuffix, err := readTrashLink(trashRootPath)
if err != nil {
sublog.Error().Err(err).Str("trashRoot", trashRootPath).Msg("error reading trash link")
@@ -92,7 +106,7 @@ func (fs *Decomposedfs) ListRecycle(ctx context.Context, ref *provider.Reference
}
origin := ""
attrs, err := fs.lu.MetadataBackend().All(ctx, originalPath)
attrs, err := tb.fs.lu.MetadataBackend().All(ctx, originalPath)
if err != nil {
return items, err
}
@@ -118,21 +132,21 @@ func (fs *Decomposedfs) ListRecycle(ctx context.Context, ref *provider.Reference
var size int64
if relativePath == "" {
// this is the case when we want to directly list a file in the trashbin
nodeType := fs.lu.TypeFromPath(ctx, originalPath)
nodeType := tb.fs.lu.TypeFromPath(ctx, originalPath)
switch nodeType {
case provider.ResourceType_RESOURCE_TYPE_FILE:
size, err = fs.lu.ReadBlobSizeAttr(ctx, originalPath)
_, size, err = tb.fs.lu.ReadBlobIDAndSizeAttr(ctx, originalPath, nil)
if err != nil {
return items, err
}
case provider.ResourceType_RESOURCE_TYPE_CONTAINER:
size, err = fs.lu.MetadataBackend().GetInt64(ctx, originalPath, prefixes.TreesizeAttr)
size, err = tb.fs.lu.MetadataBackend().GetInt64(ctx, originalPath, prefixes.TreesizeAttr)
if err != nil {
return items, err
}
}
item := &provider.RecycleItem{
Type: fs.lu.TypeFromPath(ctx, originalPath),
Type: tb.fs.lu.TypeFromPath(ctx, originalPath),
Size: uint64(size),
Key: filepath.Join(key, relativePath),
DeletionTime: deletionTime,
@@ -165,16 +179,16 @@ func (fs *Decomposedfs) ListRecycle(ctx context.Context, ref *provider.Reference
// reset size
size = 0
nodeType := fs.lu.TypeFromPath(ctx, resolvedChildPath)
nodeType := tb.fs.lu.TypeFromPath(ctx, resolvedChildPath)
switch nodeType {
case provider.ResourceType_RESOURCE_TYPE_FILE:
size, err = fs.lu.ReadBlobSizeAttr(ctx, resolvedChildPath)
_, size, err = tb.fs.lu.ReadBlobIDAndSizeAttr(ctx, resolvedChildPath, nil)
if err != nil {
sublog.Error().Err(err).Str("name", name).Msg("invalid blob size, skipping")
continue
}
case provider.ResourceType_RESOURCE_TYPE_CONTAINER:
size, err = fs.lu.MetadataBackend().GetInt64(ctx, resolvedChildPath, prefixes.TreesizeAttr)
size, err = tb.fs.lu.MetadataBackend().GetInt64(ctx, resolvedChildPath, prefixes.TreesizeAttr)
if err != nil {
sublog.Error().Err(err).Str("name", name).Msg("invalid tree size, skipping")
continue
@@ -218,16 +232,16 @@ func readTrashLink(path string) (string, string, string, error) {
return resolved, link[15:51], link[54:], nil
}
func (fs *Decomposedfs) listTrashRoot(ctx context.Context, spaceID string) ([]*provider.RecycleItem, error) {
func (tb *DecomposedfsTrashbin) listTrashRoot(ctx context.Context, spaceID string) ([]*provider.RecycleItem, error) {
log := appctx.GetLogger(ctx)
trashRoot := fs.getRecycleRoot(spaceID)
trashRoot := tb.getRecycleRoot(spaceID)
items := []*provider.RecycleItem{}
subTrees, err := filepath.Glob(trashRoot + "/*")
if err != nil {
return nil, err
}
numWorkers := fs.o.MaxConcurrency
numWorkers := tb.fs.o.MaxConcurrency
if len(subTrees) < numWorkers {
numWorkers = len(subTrees)
}
@@ -273,13 +287,13 @@ func (fs *Decomposedfs) listTrashRoot(ctx context.Context, spaceID string) ([]*p
continue
}
attrs, err := fs.lu.MetadataBackend().All(ctx, nodePath)
attrs, err := tb.fs.lu.MetadataBackend().All(ctx, nodePath)
if err != nil {
log.Error().Err(err).Str("trashRoot", trashRoot).Str("item", itemPath).Str("node_path", nodePath).Msg("could not get extended attributes, skipping")
continue
}
nodeType := fs.lu.TypeFromPath(ctx, nodePath)
nodeType := tb.fs.lu.TypeFromPath(ctx, nodePath)
if nodeType == provider.ResourceType_RESOURCE_TYPE_INVALID {
log.Error().Err(err).Str("trashRoot", trashRoot).Str("item", itemPath).Str("node_path", nodePath).Msg("invalid node type, skipping")
continue
@@ -331,14 +345,14 @@ func (fs *Decomposedfs) listTrashRoot(ctx context.Context, spaceID string) ([]*p
}
// RestoreRecycleItem restores the specified item
func (fs *Decomposedfs) RestoreRecycleItem(ctx context.Context, ref *provider.Reference, key, relativePath string, restoreRef *provider.Reference) error {
func (tb *DecomposedfsTrashbin) RestoreRecycleItem(ctx context.Context, ref *provider.Reference, key, relativePath string, restoreRef *provider.Reference) error {
if ref == nil {
return errtypes.BadRequest("missing reference, needs a space id")
}
var targetNode *node.Node
if restoreRef != nil {
tn, err := fs.lu.NodeFromResource(ctx, restoreRef)
tn, err := tb.fs.lu.NodeFromResource(ctx, restoreRef)
if err != nil {
return err
}
@@ -346,13 +360,13 @@ func (fs *Decomposedfs) RestoreRecycleItem(ctx context.Context, ref *provider.Re
targetNode = tn
}
rn, parent, restoreFunc, err := fs.tp.RestoreRecycleItemFunc(ctx, ref.ResourceId.SpaceId, key, relativePath, targetNode)
rn, parent, restoreFunc, err := tb.fs.tp.RestoreRecycleItemFunc(ctx, ref.ResourceId.SpaceId, key, relativePath, targetNode)
if err != nil {
return err
}
// check permissions of deleted node
rp, err := fs.p.AssembleTrashPermissions(ctx, rn)
rp, err := tb.fs.p.AssembleTrashPermissions(ctx, rn)
switch {
case err != nil:
return err
@@ -367,7 +381,7 @@ func (fs *Decomposedfs) RestoreRecycleItem(ctx context.Context, ref *provider.Re
storagespace.ContextSendSpaceOwnerID(ctx, rn.SpaceOwnerOrManager(ctx))
// check we can write to the parent of the restore reference
pp, err := fs.p.AssemblePermissions(ctx, parent)
pp, err := tb.fs.p.AssemblePermissions(ctx, parent)
switch {
case err != nil:
return err
@@ -384,12 +398,12 @@ func (fs *Decomposedfs) RestoreRecycleItem(ctx context.Context, ref *provider.Re
}
// PurgeRecycleItem purges the specified item, all its children and all their revisions
func (fs *Decomposedfs) PurgeRecycleItem(ctx context.Context, ref *provider.Reference, key, relativePath string) error {
func (tb *DecomposedfsTrashbin) PurgeRecycleItem(ctx context.Context, ref *provider.Reference, key, relativePath string) error {
if ref == nil {
return errtypes.BadRequest("missing reference, needs a space id")
}
rn, purgeFunc, err := fs.tp.PurgeRecycleItemFunc(ctx, ref.ResourceId.OpaqueId, key, relativePath)
rn, purgeFunc, err := tb.fs.tp.PurgeRecycleItemFunc(ctx, ref.ResourceId.OpaqueId, key, relativePath)
if err != nil {
if errors.Is(err, iofs.ErrNotExist) {
return errtypes.NotFound(key)
@@ -398,7 +412,7 @@ func (fs *Decomposedfs) PurgeRecycleItem(ctx context.Context, ref *provider.Refe
}
// check permissions of deleted node
rp, err := fs.p.AssembleTrashPermissions(ctx, rn)
rp, err := tb.fs.p.AssembleTrashPermissions(ctx, rn)
switch {
case err != nil:
return err
@@ -414,26 +428,26 @@ func (fs *Decomposedfs) PurgeRecycleItem(ctx context.Context, ref *provider.Refe
}
// EmptyRecycle empties the trash
func (fs *Decomposedfs) EmptyRecycle(ctx context.Context, ref *provider.Reference) error {
func (tb *DecomposedfsTrashbin) EmptyRecycle(ctx context.Context, ref *provider.Reference) error {
if ref == nil || ref.ResourceId == nil || ref.ResourceId.OpaqueId == "" {
return errtypes.BadRequest("spaceid must be set")
}
items, err := fs.ListRecycle(ctx, ref, "", "")
items, err := tb.ListRecycle(ctx, ref, "", "")
if err != nil {
return err
}
for _, i := range items {
if err := fs.PurgeRecycleItem(ctx, ref, i.Key, ""); err != nil {
if err := tb.PurgeRecycleItem(ctx, ref, i.Key, ""); err != nil {
return err
}
}
// TODO what permission should we check? we could check the root node of the user? or the owner permissions on his home root node?
// The current impl will wipe your own trash. or when no user provided the trash of 'root'
return os.RemoveAll(fs.getRecycleRoot(ref.ResourceId.SpaceId))
return os.RemoveAll(tb.getRecycleRoot(ref.ResourceId.SpaceId))
}
func (fs *Decomposedfs) getRecycleRoot(spaceID string) string {
return filepath.Join(fs.getSpaceRoot(spaceID), "trash")
func (tb *DecomposedfsTrashbin) getRecycleRoot(spaceID string) string {
return filepath.Join(tb.fs.getSpaceRoot(spaceID), "trash")
}

View File

@@ -86,7 +86,7 @@ func (fs *Decomposedfs) ListRevisions(ctx context.Context, ref *provider.Referen
Key: n.ID + node.RevisionIDDelimiter + parts[1],
Mtime: uint64(mtime.Unix()),
}
blobSize, err := fs.lu.ReadBlobSizeAttr(ctx, items[i])
_, blobSize, err := fs.lu.ReadBlobIDAndSizeAttr(ctx, items[i], nil)
if err != nil {
appctx.GetLogger(ctx).Error().Err(err).Str("name", fi.Name()).Msg("error reading blobsize xattr, using 0")
}
@@ -148,13 +148,9 @@ func (fs *Decomposedfs) DownloadRevision(ctx context.Context, ref *provider.Refe
contentPath := fs.lu.InternalPath(spaceID, revisionKey)
blobid, err := fs.lu.ReadBlobIDAttr(ctx, contentPath)
blobid, blobsize, err := fs.lu.ReadBlobIDAndSizeAttr(ctx, contentPath, nil)
if err != nil {
return nil, errors.Wrapf(err, "Decomposedfs: could not read blob id of revision '%s' for node '%s'", n.ID, revisionKey)
}
blobsize, err := fs.lu.ReadBlobSizeAttr(ctx, contentPath)
if err != nil {
return nil, errors.Wrapf(err, "Decomposedfs: could not read blob size of revision '%s' for node '%s'", n.ID, revisionKey)
return nil, errors.Wrapf(err, "Decomposedfs: could not read blob id and size for revision '%s' of node '%s'", n.ID, revisionKey)
}
revisionNode := node.Node{SpaceID: spaceID, BlobID: blobid, Blobsize: blobsize} // blobsize is needed for the s3ng blobstore
@@ -238,7 +234,7 @@ func (fs *Decomposedfs) RestoreRevision(ctx context.Context, ref *provider.Refer
if err := os.Remove(newRevisionPath); err != nil {
log.Error().Err(err).Str("revision", filepath.Base(newRevisionPath)).Msg("could not clean up revision node")
}
if err := fs.lu.MetadataBackend().Purge(newRevisionPath); err != nil {
if err := fs.lu.MetadataBackend().Purge(ctx, newRevisionPath); err != nil {
log.Error().Err(err).Str("revision", filepath.Base(newRevisionPath)).Msg("could not clean up revision node")
}
}
@@ -299,7 +295,7 @@ func (fs *Decomposedfs) RestoreRevision(ctx context.Context, ref *provider.Refer
if err := os.Remove(fs.lu.MetadataBackend().LockfilePath(restoredRevisionPath)); err != nil {
log.Warn().Err(err).Interface("ref", ref).Str("originalnode", kp[0]).Str("revisionKey", revisionKey).Msg("could not delete old revision metadata lockfile, continuing")
}
if err := fs.lu.MetadataBackend().Purge(restoredRevisionPath); err != nil {
if err := fs.lu.MetadataBackend().Purge(ctx, restoredRevisionPath); err != nil {
log.Warn().Err(err).Interface("ref", ref).Str("originalnode", kp[0]).Str("revisionKey", revisionKey).Msg("could not purge old revision from cache, continuing")
}

View File

@@ -739,7 +739,7 @@ func (fs *Decomposedfs) DeleteStorageSpace(ctx context.Context, req *provider.De
}
// invalidate cache
if err := fs.lu.MetadataBackend().Purge(n.InternalPath()); err != nil {
if err := fs.lu.MetadataBackend().Purge(ctx, n.InternalPath()); err != nil {
return err
}

View File

@@ -0,0 +1,127 @@
// Copyright 2018-2024 CERN
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// In applying this license, CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.
package timemanager
import (
"context"
"os"
"time"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata/prefixes"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node"
)
// Manager is responsible for managing time-related attributes of nodes in a decomposed file system.
type Manager struct {
}
// OverrideMtime overrides the modification time (mtime) attribute of a node with the given time.
func (m *Manager) OverrideMtime(ctx context.Context, _ *node.Node, attrs *node.Attributes, mtime time.Time) error {
attrs.SetString(prefixes.MTimeAttr, mtime.UTC().Format(time.RFC3339Nano))
return nil
}
// MTime retrieves the modification time (mtime) attribute of a node.
// If the attribute is not set, it falls back to the file's last modification time.
func (dtm *Manager) MTime(ctx context.Context, n *node.Node) (time.Time, error) {
b, err := n.XattrString(ctx, prefixes.MTimeAttr)
if err != nil {
fi, err := os.Lstat(n.InternalPath())
if err != nil {
return time.Time{}, err
}
return fi.ModTime(), nil
}
return time.Parse(time.RFC3339Nano, b)
}
// SetMTime sets the modification time (mtime) attribute of a node to the given time.
// If the time is nil, the attribute is removed.
func (dtm *Manager) SetMTime(ctx context.Context, n *node.Node, mtime *time.Time) error {
if mtime == nil {
return n.RemoveXattr(ctx, prefixes.MTimeAttr, true)
}
return n.SetXattrString(ctx, prefixes.MTimeAttr, mtime.UTC().Format(time.RFC3339Nano))
}
// TMTime retrieves the tree modification time (tmtime) attribute of a node.
// If the attribute is not set, it falls back to the node's modification time (mtime).
func (dtm *Manager) TMTime(ctx context.Context, n *node.Node) (time.Time, error) {
b, err := n.XattrString(ctx, prefixes.TreeMTimeAttr)
if err == nil {
return time.Parse(time.RFC3339Nano, b)
}
// no tmtime, use mtime
return dtm.MTime(ctx, n)
}
// SetTMTime sets the tree modification time (tmtime) attribute of a node to the given time.
// If the time is nil, the attribute is removed.
func (dtm *Manager) SetTMTime(ctx context.Context, n *node.Node, tmtime *time.Time) error {
if tmtime == nil {
return n.RemoveXattr(ctx, prefixes.TreeMTimeAttr, true)
}
return n.SetXattrString(ctx, prefixes.TreeMTimeAttr, tmtime.UTC().Format(time.RFC3339Nano))
}
// CTime retrieves the creation time (ctime) attribute of a node.
// Since decomposedfs does not differentiate between ctime and mtime, it falls back to the node's modification time (mtime).
func (dtm *Manager) CTime(ctx context.Context, n *node.Node) (time.Time, error) {
// decomposedfs does not differentiate between ctime and mtime
return dtm.MTime(ctx, n)
}
// SetCTime sets the creation time (ctime) attribute of a node to the given time.
// Since decomposedfs does not differentiate between ctime and mtime, it sets the modification time (mtime) instead.
func (dtm *Manager) SetCTime(ctx context.Context, n *node.Node, mtime *time.Time) error {
// decomposedfs does not differentiate between ctime and mtime
return dtm.SetMTime(ctx, n, mtime)
}
// TCTime retrieves the tree creation time (tctime) attribute of a node.
// Since decomposedfs does not differentiate between ctime and mtime, it falls back to the tree modification time (tmtime).
func (dtm *Manager) TCTime(ctx context.Context, n *node.Node) (time.Time, error) {
// decomposedfs does not differentiate between ctime and mtime
return dtm.TMTime(ctx, n)
}
// SetTCTime sets the tree creation time (tctime) attribute of a node to the given time.
// Since decomposedfs does not differentiate between ctime and mtime, it sets the tree modification time (tmtime) instead.
func (dtm *Manager) SetTCTime(ctx context.Context, n *node.Node, tmtime *time.Time) error {
// decomposedfs does not differentiate between ctime and mtime
return dtm.SetTMTime(ctx, n, tmtime)
}
// DTime retrieves the deletion time (dtime) attribute of a node.
func (dtm *Manager) DTime(ctx context.Context, n *node.Node) (tmTime time.Time, err error) {
b, err := n.XattrString(ctx, prefixes.DTimeAttr)
if err != nil {
return time.Time{}, err
}
return time.Parse(time.RFC3339Nano, b)
}
// SetDTime sets the deletion time (dtime) attribute of a node to the given time.
// If the time is nil, the attribute is removed.
func (dtm *Manager) SetDTime(ctx context.Context, n *node.Node, t *time.Time) (err error) {
if t == nil {
return n.RemoveXattr(ctx, prefixes.DTimeAttr, true)
}
return n.SetXattrString(ctx, prefixes.DTimeAttr, t.UTC().Format(time.RFC3339Nano))
}

View File

@@ -0,0 +1,35 @@
// Copyright 2018-2024 CERN
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// In applying this license, CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.
package trashbin
import (
"context"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"github.com/cs3org/reva/v2/pkg/storage"
)
type Trashbin interface {
Setup(storage.FS) error
ListRecycle(ctx context.Context, ref *provider.Reference, key, relativePath string) ([]*provider.RecycleItem, error)
RestoreRecycleItem(ctx context.Context, ref *provider.Reference, key, relativePath string, restoreRef *provider.Reference) error
PurgeRecycleItem(ctx context.Context, ref *provider.Reference, key, relativePath string) error
EmptyRecycle(ctx context.Context, ref *provider.Reference) error
}

View File

@@ -733,7 +733,7 @@ func (t *Tree) removeNode(ctx context.Context, path, timeSuffix string, n *node.
return err
}
if err := t.lookup.MetadataBackend().Purge(path); err != nil {
if err := t.lookup.MetadataBackend().Purge(ctx, path); err != nil {
logger.Error().Err(err).Str("path", t.lookup.MetadataBackend().MetadataPath(path)).Msg("error purging node metadata")
return err
}
@@ -757,7 +757,7 @@ func (t *Tree) removeNode(ctx context.Context, path, timeSuffix string, n *node.
continue
}
bID, err := t.lookup.ReadBlobIDAttr(ctx, rev)
bID, _, err := t.lookup.ReadBlobIDAndSizeAttr(ctx, rev, nil)
if err != nil {
logger.Error().Err(err).Str("revision", rev).Msg("error reading blobid attribute")
return err

View File

@@ -255,15 +255,8 @@ func (store OcisStore) CreateNodeForUpload(session *OcisSession, initAttrs node.
return nil, err
}
mtime := time.Now()
if !session.MTime().IsZero() {
// overwrite mtime if requested
mtime = session.MTime()
}
// overwrite technical information
initAttrs.SetString(prefixes.IDAttr, n.ID)
initAttrs.SetString(prefixes.MTimeAttr, mtime.UTC().Format(time.RFC3339Nano))
initAttrs.SetInt64(prefixes.TypeAttr, int64(provider.ResourceType_RESOURCE_TYPE_FILE))
initAttrs.SetString(prefixes.ParentidAttr, n.ParentID)
initAttrs.SetString(prefixes.NameAttr, n.Name)
@@ -271,6 +264,17 @@ func (store OcisStore) CreateNodeForUpload(session *OcisSession, initAttrs node.
initAttrs.SetInt64(prefixes.BlobsizeAttr, n.Blobsize)
initAttrs.SetString(prefixes.StatusPrefix, node.ProcessingStatus+session.ID())
// set mtime on the new node
mtime := time.Now()
if !session.MTime().IsZero() {
// overwrite mtime if requested
mtime = session.MTime()
}
err = store.lu.TimeManager().OverrideMtime(ctx, n, &initAttrs, mtime)
if err != nil {
return nil, errors.Wrap(err, "Decomposedfs: failed to set the mtime")
}
// update node metadata with new blobid etc
err = n.SetXattrsWithContext(ctx, initAttrs, false)
if err != nil {
@@ -367,7 +371,7 @@ func (store OcisStore) updateExistingNode(ctx context.Context, session *OcisSess
}
// delete old blob
bID, err := session.store.lu.ReadBlobIDAttr(ctx, versionPath)
bID, _, err := session.store.lu.ReadBlobIDAndSizeAttr(ctx, versionPath, nil)
if err != nil {
return unlock, err
}

View File

@@ -199,6 +199,7 @@ func (handler *UnroutedHandler) Middleware(h http.Handler) http.Handler {
if origin := r.Header.Get("Origin"); !cors.Disable && origin != "" {
originIsAllowed := cors.AllowOrigin.MatchString(origin)
if !originIsAllowed {
fmt.Println("ORIGIN IS NOT ALLOWED", origin)
handler.sendError(c, ErrOriginNotAllowed)
return
}
@@ -691,12 +692,14 @@ func (handler *UnroutedHandler) HeadFile(w http.ResponseWriter, r *http.Request)
// PatchFile adds a chunk to an upload. This operation is only allowed
// if enough space in the upload is left.
func (handler *UnroutedHandler) PatchFile(w http.ResponseWriter, r *http.Request) {
fmt.Println("PATCH FILE")
c := handler.getContext(w, r)
isTusV1 := !handler.usesIETFDraft(r)
// Check for presence of application/offset+octet-stream
if isTusV1 && r.Header.Get("Content-Type") != "application/offset+octet-stream" {
fmt.Println("WRONG CONTENT TYPE")
handler.sendError(c, ErrInvalidContentType)
return
}
@@ -704,12 +707,14 @@ func (handler *UnroutedHandler) PatchFile(w http.ResponseWriter, r *http.Request
// Check for presence of a valid Upload-Offset Header
offset, err := strconv.ParseInt(r.Header.Get("Upload-Offset"), 10, 64)
if err != nil || offset < 0 {
fmt.Println("WRONG OFFSET")
handler.sendError(c, ErrInvalidOffset)
return
}
id, err := extractIDFromPath(r.URL.Path)
if err != nil {
fmt.Println("WRONG ID")
handler.sendError(c, err)
return
}
@@ -718,6 +723,7 @@ func (handler *UnroutedHandler) PatchFile(w http.ResponseWriter, r *http.Request
if handler.composer.UsesLocker {
lock, err := handler.lockUpload(c, id)
if err != nil {
fmt.Println("WRONG LOCK")
handler.sendError(c, err)
return
}
@@ -727,23 +733,27 @@ func (handler *UnroutedHandler) PatchFile(w http.ResponseWriter, r *http.Request
upload, err := handler.composer.Core.GetUpload(c, id)
if err != nil {
fmt.Println("WRONG UPLOAD")
handler.sendError(c, err)
return
}
info, err := upload.GetInfo(c)
if err != nil {
fmt.Println("WRONG INFO")
handler.sendError(c, err)
return
}
// Modifying a final upload is not allowed
if info.IsFinal {
fmt.Println("WRONG FINAL")
handler.sendError(c, ErrModifyFinal)
return
}
if offset != info.Offset {
fmt.Println("WRONG INFO OFFSET")
handler.sendError(c, ErrMismatchOffset)
return
}
@@ -760,27 +770,32 @@ func (handler *UnroutedHandler) PatchFile(w http.ResponseWriter, r *http.Request
// Do not proxy the call to the data store if the upload is already completed
if !info.SizeIsDeferred && info.Offset == info.Size {
resp.Header["Upload-Offset"] = strconv.FormatInt(offset, 10)
fmt.Println("UPLOAD ALREADY COMPLETED")
handler.sendResp(c, resp)
return
}
if r.Header.Get("Upload-Length") != "" {
if !handler.composer.UsesLengthDeferrer {
fmt.Println("UPLOAD LENGTH DEFERRER")
handler.sendError(c, ErrNotImplemented)
return
}
if !info.SizeIsDeferred {
fmt.Println("UPLOAD LENGTH NOT DEFERED")
handler.sendError(c, ErrInvalidUploadLength)
return
}
uploadLength, err := strconv.ParseInt(r.Header.Get("Upload-Length"), 10, 64)
if err != nil || uploadLength < 0 || uploadLength < info.Offset || (handler.config.MaxSize > 0 && uploadLength > handler.config.MaxSize) {
fmt.Println("UPLOAD LENGTH INVALID")
handler.sendError(c, ErrInvalidUploadLength)
return
}
lengthDeclarableUpload := handler.composer.LengthDeferrer.AsLengthDeclarableUpload(upload)
if err := lengthDeclarableUpload.DeclareLength(c, uploadLength); err != nil {
fmt.Println("UPLOAD LENGTH DECLARED")
handler.sendError(c, err)
return
}
@@ -791,6 +806,7 @@ func (handler *UnroutedHandler) PatchFile(w http.ResponseWriter, r *http.Request
resp, err = handler.writeChunk(c, resp, upload, info)
if err != nil {
fmt.Println("CANT WRITE CHUNK")
handler.sendError(c, err)
return
}
@@ -799,6 +815,7 @@ func (handler *UnroutedHandler) PatchFile(w http.ResponseWriter, r *http.Request
if willCompleteUpload && info.SizeIsDeferred {
info, err = upload.GetInfo(c)
if err != nil {
fmt.Println("CANT GET INFO")
handler.sendError(c, err)
return
}
@@ -807,6 +824,7 @@ func (handler *UnroutedHandler) PatchFile(w http.ResponseWriter, r *http.Request
lengthDeclarableUpload := handler.composer.LengthDeferrer.AsLengthDeclarableUpload(upload)
if err := lengthDeclarableUpload.DeclareLength(c, uploadLength); err != nil {
fmt.Println("CANT UPLOAD LENGTH")
handler.sendError(c, err)
return
}
@@ -816,12 +834,14 @@ func (handler *UnroutedHandler) PatchFile(w http.ResponseWriter, r *http.Request
resp, err = handler.finishUploadIfComplete(c, resp, upload, info)
if err != nil {
fmt.Println("CANT COMPLETE")
handler.sendError(c, err)
return
}
}
handler.sendResp(c, resp)
fmt.Println("PATCH COMPLETE")
}
// writeChunk reads the body from the requests r and appends it to the upload

6
vendor/modules.txt vendored
View File

@@ -367,7 +367,7 @@ github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1
github.com/cs3org/go-cs3apis/cs3/storage/registry/v1beta1
github.com/cs3org/go-cs3apis/cs3/tx/v1beta1
github.com/cs3org/go-cs3apis/cs3/types/v1beta1
# github.com/cs3org/reva/v2 v2.24.1
# github.com/cs3org/reva/v2 v2.24.2-0.20240917121936-fb394587b472
## explicit; go 1.21
github.com/cs3org/reva/v2/cmd/revad/internal/grace
github.com/cs3org/reva/v2/cmd/revad/runtime
@@ -670,6 +670,8 @@ github.com/cs3org/reva/v2/pkg/storage/fs/posix
github.com/cs3org/reva/v2/pkg/storage/fs/posix/blobstore
github.com/cs3org/reva/v2/pkg/storage/fs/posix/lookup
github.com/cs3org/reva/v2/pkg/storage/fs/posix/options
github.com/cs3org/reva/v2/pkg/storage/fs/posix/timemanager
github.com/cs3org/reva/v2/pkg/storage/fs/posix/trashbin
github.com/cs3org/reva/v2/pkg/storage/fs/posix/tree
github.com/cs3org/reva/v2/pkg/storage/fs/registry
github.com/cs3org/reva/v2/pkg/storage/fs/s3
@@ -693,6 +695,8 @@ github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node
github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/options
github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/permissions
github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/spaceidindex
github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/timemanager
github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/trashbin
github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/tree
github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/tree/propagator
github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/upload