Merge pull request #7859 from owncloud/exctract-uploadsessionlister-interface

[full-ci] use UploadSessionLister interface
This commit is contained in:
Michael Barz
2023-12-04 14:03:29 +01:00
committed by GitHub
10 changed files with 373 additions and 164 deletions

2
go.mod
View File

@@ -13,7 +13,7 @@ require (
github.com/coreos/go-oidc v2.2.1+incompatible
github.com/coreos/go-oidc/v3 v3.8.0
github.com/cs3org/go-cs3apis v0.0.0-20231023073225-7748710e0781
github.com/cs3org/reva/v2 v2.16.1-0.20231128104331-ea8d1336afc9
github.com/cs3org/reva/v2 v2.16.1-0.20231201122033-a389ddc645c4
github.com/dhowden/tag v0.0.0-20230630033851-978a0926ee25
github.com/disintegration/imaging v1.6.2
github.com/dutchcoders/go-clamd v0.0.0-20170520113014-b970184f4d9e

4
go.sum
View File

@@ -1017,8 +1017,8 @@ github.com/crewjam/saml v0.4.14 h1:g9FBNx62osKusnFzs3QTN5L9CVA/Egfgm+stJShzw/c=
github.com/crewjam/saml v0.4.14/go.mod h1:UVSZCf18jJkk6GpWNVqcyQJMD5HsRugBPf4I1nl2mME=
github.com/cs3org/go-cs3apis v0.0.0-20231023073225-7748710e0781 h1:BUdwkIlf8IS2FasrrPg8gGPHQPOrQ18MS1Oew2tmGtY=
github.com/cs3org/go-cs3apis v0.0.0-20231023073225-7748710e0781/go.mod h1:UXha4TguuB52H14EMoSsCqDj7k8a/t7g4gVP+bgY5LY=
github.com/cs3org/reva/v2 v2.16.1-0.20231128104331-ea8d1336afc9 h1:5vKQcL1hPHEZKu9e8C9rl0ap3ofMBznmoSgi4lRYXec=
github.com/cs3org/reva/v2 v2.16.1-0.20231128104331-ea8d1336afc9/go.mod h1:zcrrYVsBv/DwhpyO2/W5hoSZ/k6az6Z2EYQok65uqZY=
github.com/cs3org/reva/v2 v2.16.1-0.20231201122033-a389ddc645c4 h1:61AwMfov2OxrUElWXXKHZfBsuxgNIVwZVQW4PlJoqnM=
github.com/cs3org/reva/v2 v2.16.1-0.20231201122033-a389ddc645c4/go.mod h1:zcrrYVsBv/DwhpyO2/W5hoSZ/k6az6Z2EYQok65uqZY=
github.com/cyberdelia/templates v0.0.0-20141128023046-ca7fffd4298c/go.mod h1:GyV+0YP4qX0UQ7r2MoYZ+AvYDp12OF5yg4q8rGnyNh4=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=

View File

@@ -3,11 +3,8 @@ package command
import (
"fmt"
"os"
"strconv"
"sync"
"time"
tusd "github.com/tus/tusd/pkg/handler"
"github.com/urfave/cli/v2"
"github.com/cs3org/reva/v2/pkg/storage"
@@ -51,20 +48,21 @@ func ListUploads(cfg *config.Config) *cli.Command {
return err
}
managingFS, ok := fs.(storage.UploadsManager)
managingFS, ok := fs.(storage.UploadSessionLister)
if !ok {
fmt.Fprintf(os.Stderr, "'%s' storage does not support listing expired uploads\n", cfg.Driver)
os.Exit(1)
}
uploads, err := managingFS.ListUploads()
falseValue := false
uploads, err := managingFS.ListUploadSessions(c.Context, storage.UploadSessionFilter{Expired: &falseValue})
if err != nil {
return err
}
fmt.Println("Incomplete uploads:")
for _, u := range uploads {
fmt.Printf(" - %s (%s, Size: %d, Expires: %s)\n", u.ID, u.MetaData["filename"], u.Size, expiredString(u.MetaData["expires"]))
ref := u.Reference()
fmt.Printf(" - %s (Space: %s, Name: %s, Size: %d/%d, Expires: %s, Processing: %t)\n", ref.GetResourceId().GetSpaceId(), u.ID(), u.Filename(), u.Offset(), u.Size(), u.Expires(), u.IsProcessing())
}
return nil
},
@@ -92,7 +90,7 @@ func PurgeExpiredUploads(cfg *config.Config) *cli.Command {
return err
}
managingFS, ok := fs.(storage.UploadsManager)
managingFS, ok := fs.(storage.UploadSessionLister)
if !ok {
fmt.Fprintf(os.Stderr, "'%s' storage does not support clean expired uploads\n", cfg.Driver)
os.Exit(1)
@@ -100,18 +98,23 @@ func PurgeExpiredUploads(cfg *config.Config) *cli.Command {
wg := sync.WaitGroup{}
wg.Add(1)
purgedChannel := make(chan tusd.FileInfo)
falseValue := false
trueValue := false
uploads, err := managingFS.ListUploadSessions(c.Context, storage.UploadSessionFilter{Expired: &trueValue, Processing: &falseValue})
if err != nil {
return err
}
fmt.Println("Cleaned uploads:")
fmt.Println("purging uploads:")
go func() {
for purged := range purgedChannel {
fmt.Printf(" - %s (%s, Size: %d, Expires: %s)\n", purged.ID, purged.MetaData["filename"], purged.Size, expiredString(purged.MetaData["expires"]))
for _, u := range uploads {
ref := u.Reference()
fmt.Printf(" - %s (Space: %s, Name: %s, Size: %d/%d, Expires: %s, Processing: %t)\n", ref.GetResourceId().GetSpaceId(), u.ID(), u.Filename(), u.Offset(), u.Size(), u.Expires(), u.IsProcessing())
u.Purge(c.Context)
}
wg.Done()
}()
err = managingFS.PurgeExpiredUploads(purgedChannel)
close(purgedChannel)
wg.Wait()
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to clean expired uploads '%s'\n", err)
@@ -121,12 +124,3 @@ func PurgeExpiredUploads(cfg *config.Config) *cli.Command {
},
}
}
func expiredString(e string) string {
expired := "N/A"
iExpires, err := strconv.Atoi(e)
if err == nil {
expired = time.Unix(int64(iExpires), 0).Format(time.RFC3339)
}
return expired
}

View File

@@ -23,13 +23,11 @@ import (
"log"
"net/http"
"path"
"path/filepath"
"time"
"github.com/pkg/errors"
tusd "github.com/tus/tusd/pkg/handler"
userv1beta1 "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"github.com/cs3org/reva/v2/internal/http/services/owncloud/ocdav/net"
"github.com/cs3org/reva/v2/pkg/appctx"
@@ -40,8 +38,8 @@ import (
"github.com/cs3org/reva/v2/pkg/rhttp/datatx/metrics"
"github.com/cs3org/reva/v2/pkg/storage"
"github.com/cs3org/reva/v2/pkg/storage/cache"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/upload"
"github.com/cs3org/reva/v2/pkg/storagespace"
"github.com/cs3org/reva/v2/pkg/utils"
"github.com/mitchellh/mapstructure"
)
@@ -103,33 +101,27 @@ func (m *manager) Handler(fs storage.FS) (http.Handler, error) {
return nil, err
}
go func() {
for {
ev := <-handler.CompleteUploads
info := ev.Upload
spaceOwner := &userv1beta1.UserId{
OpaqueId: info.Storage["SpaceOwnerOrManager"],
}
owner := &userv1beta1.UserId{
Idp: info.Storage["Idp"],
OpaqueId: info.Storage["UserId"],
}
ref := &provider.Reference{
ResourceId: &provider.ResourceId{
StorageId: info.MetaData["providerID"],
SpaceId: info.Storage["SpaceRoot"],
OpaqueId: info.Storage["SpaceRoot"],
},
Path: utils.MakeRelativePath(filepath.Join(info.MetaData["dir"], info.MetaData["filename"])),
}
datatx.InvalidateCache(owner, ref, m.statCache)
if m.publisher != nil {
if err := datatx.EmitFileUploadedEvent(spaceOwner, owner, ref, m.publisher); err != nil {
appctx.GetLogger(context.Background()).Error().Err(err).Msg("failed to publish FileUploaded event")
if _, ok := fs.(storage.UploadSessionLister); ok {
// We can currently only send updates if the fs is decomposedfs as we read very specific keys from the storage map of the tus info
go func() {
for {
ev := <-handler.CompleteUploads
// We should be able to get the upload progress with fs.GetUploadProgress, but currently tus will erase the info files
// so we create a Progress instance here that is used to read the correct properties
up := upload.Progress{
Info: ev.Upload,
}
executant := up.Executant()
ref := up.Reference()
datatx.InvalidateCache(&executant, &ref, m.statCache)
if m.publisher != nil {
if err := datatx.EmitFileUploadedEvent(up.SpaceOwner(), &executant, &ref, m.publisher); err != nil {
appctx.GetLogger(context.Background()).Error().Err(err).Msg("failed to publish FileUploaded event")
}
}
}
}
}()
}()
}
h := handler.Middleware(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
method := r.Method

View File

@@ -23,16 +23,10 @@ import (
"io"
"net/url"
tusd "github.com/tus/tusd/pkg/handler"
userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
registry "github.com/cs3org/go-cs3apis/cs3/storage/registry/v1beta1"
)
// UploadFinishedFunc is a callback function used in storage drivers to indicate that an upload has finished
type UploadFinishedFunc func(spaceOwner, owner *userpb.UserId, ref *provider.Reference)
// FS is the interface to implement access to the storage.
type FS interface {
GetHome(ctx context.Context) (string, error)
@@ -77,12 +71,6 @@ type FS interface {
DeleteStorageSpace(ctx context.Context, req *provider.DeleteStorageSpaceRequest) error
}
// UploadsManager defines the interface for FS implementations that allow for managing uploads
type UploadsManager interface {
ListUploads() ([]tusd.FileInfo, error)
PurgeExpiredUploads(chan<- tusd.FileInfo) error
}
// Registry is the interface that storage registries implement
// for discovering storage providers
type Registry interface {
@@ -98,9 +86,3 @@ type PathWrapper interface {
Unwrap(ctx context.Context, rp string) (string, error)
Wrap(ctx context.Context, rp string) (string, error)
}
type UploadRequest struct {
Ref *provider.Reference
Body io.ReadCloser
Length int64
}

View File

@@ -0,0 +1,86 @@
// Copyright 2018-2021 CERN
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// In applying this license, CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.
package storage
import (
"context"
"io"
"time"
userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
tusd "github.com/tus/tusd/pkg/handler"
)
// UploadFinishedFunc is a callback function used in storage drivers to indicate that an upload has finished
type UploadFinishedFunc func(spaceOwner, executant *userpb.UserId, ref *provider.Reference)
// UploadRequest us used in FS.Upload() to carry required upload metadata
type UploadRequest struct {
Ref *provider.Reference
Body io.ReadCloser
Length int64
}
// UploadsManager defines the interface for storage drivers that allow for managing uploads
// Deprecated: No longer used. Storage drivers should implement the UploadSessionLister.
type UploadsManager interface {
ListUploads() ([]tusd.FileInfo, error)
PurgeExpiredUploads(chan<- tusd.FileInfo) error
}
// UploadSessionLister defines the interface for FS implementations that allow listing and purging upload sessions
type UploadSessionLister interface {
// ListUploadSessions returns the upload sessions matching the given filter
ListUploadSessions(ctx context.Context, filter UploadSessionFilter) ([]UploadSession, error)
}
// UploadSession is the interface that storage drivers need to return whan listing upload sessions.
type UploadSession interface {
// ID returns the upload id
ID() string
// Filename returns the filename of the file
Filename() string
// Size returns the size of the upload
Size() int64
// Offset returns the current offset
Offset() int64
// Reference returns a reference for the file being uploaded. May be absolute id based or relative to e.g. a space root
Reference() provider.Reference
// Executant returns the userid of the user that created the upload
Executant() userpb.UserId
// SpaceOwner returns the owner of a space if set. optional
SpaceOwner() *userpb.UserId
// Expires returns the time when the upload can no longer be used
Expires() time.Time
// IsProcessing returns true if postprocessing has not finished, yet
// The actual postprocessing state is tracked in the postprocessing service.
IsProcessing() bool
// Purge allows completely removing an upload. Should emit a PostprocessingFinished event with a Delete outcome
Purge(ctx context.Context) error
}
// UploadSessionFilter can be used to filter upload sessions
type UploadSessionFilter struct {
ID *string
Processing *bool
Expired *bool
}

View File

@@ -27,6 +27,9 @@ import (
"strings"
"time"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
types "github.com/cs3org/go-cs3apis/cs3/types/v1beta1"
"github.com/cs3org/reva/v2/pkg/appctx"
@@ -35,7 +38,6 @@ import (
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata/prefixes"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node"
"github.com/cs3org/reva/v2/pkg/storagespace"
"github.com/pkg/errors"
)
// Recycle items are stored inside the node folder and start with the uuid of the deleted node.
@@ -214,66 +216,111 @@ func readTrashLink(path string) (string, string, string, error) {
func (fs *Decomposedfs) listTrashRoot(ctx context.Context, spaceID string) ([]*provider.RecycleItem, error) {
log := appctx.GetLogger(ctx)
items := make([]*provider.RecycleItem, 0)
trashRoot := fs.getRecycleRoot(spaceID)
matches, err := filepath.Glob(trashRoot + "/*/*/*/*/*")
subTrees, err := filepath.Glob(trashRoot + "/*")
if err != nil {
return nil, err
}
for _, itemPath := range matches {
nodePath, nodeID, timeSuffix, err := readTrashLink(itemPath)
if err != nil {
log.Error().Err(err).Str("trashRoot", trashRoot).Str("item", itemPath).Msg("error reading trash link, skipping")
continue
}
numWorkers := fs.o.MaxConcurrency
if len(subTrees) < numWorkers {
numWorkers = len(subTrees)
}
md, err := os.Stat(nodePath)
if err != nil {
log.Error().Err(err).Str("trashRoot", trashRoot).Str("item", itemPath).Str("node_path", nodePath).Msg("could not stat trash item, skipping")
continue
}
work := make(chan string, len(subTrees))
results := make(chan *provider.RecycleItem, len(subTrees))
attrs, err := fs.lu.MetadataBackend().All(ctx, nodePath)
if err != nil {
log.Error().Err(err).Str("trashRoot", trashRoot).Str("item", itemPath).Str("node_path", nodePath).Msg("could not get extended attributes, skipping")
continue
}
g, ctx := errgroup.WithContext(ctx)
nodeType := fs.lu.TypeFromPath(ctx, nodePath)
if nodeType == provider.ResourceType_RESOURCE_TYPE_INVALID {
log.Error().Err(err).Str("trashRoot", trashRoot).Str("item", itemPath).Str("node_path", nodePath).Msg("invalid node type, skipping")
continue
}
item := &provider.RecycleItem{
Type: nodeType,
Size: uint64(md.Size()),
Key: nodeID,
}
if deletionTime, err := time.Parse(time.RFC3339Nano, timeSuffix); err == nil {
item.DeletionTime = &types.Timestamp{
Seconds: uint64(deletionTime.Unix()),
// TODO nanos
// Distribute work
g.Go(func() error {
defer close(work)
for _, itemPath := range subTrees {
select {
case work <- itemPath:
case <-ctx.Done():
return ctx.Err()
}
} else {
log.Error().Err(err).Str("trashRoot", trashRoot).Str("item", itemPath).Str("node", nodeID).Str("dtime", timeSuffix).Msg("could not parse time format, ignoring")
}
return nil
})
// lookup origin path in extended attributes
if attr, ok := attrs[prefixes.TrashOriginAttr]; ok {
item.Ref = &provider.Reference{Path: string(attr)}
} else {
log.Error().Str("trashRoot", trashRoot).Str("item", itemPath).Str("node", nodeID).Str("dtime", timeSuffix).Msg("could not read origin path, skipping")
continue
}
// TODO filter results by permission ... on the original parent? or the trashed node?
// if it were on the original parent it would be possible to see files that were trashed before the current user got access
// so -> check the trash node itself
// hmm listing trash currently lists the current users trash or the 'root' trash. from ocs only the home storage is queried for trash items.
// for now we can only really check if the current user is the owner
items = append(items, item)
// Spawn workers that'll concurrently work the queue
for i := 0; i < numWorkers; i++ {
g.Go(func() error {
for subTree := range work {
matches, err := filepath.Glob(subTree + "/*/*/*/*")
if err != nil {
return err
}
for _, itemPath := range matches {
nodePath, nodeID, timeSuffix, err := readTrashLink(itemPath)
if err != nil {
log.Error().Err(err).Str("trashRoot", trashRoot).Str("item", itemPath).Msg("error reading trash link, skipping")
continue
}
md, err := os.Stat(nodePath)
if err != nil {
log.Error().Err(err).Str("trashRoot", trashRoot).Str("item", itemPath).Str("node_path", nodePath).Msg("could not stat trash item, skipping")
continue
}
attrs, err := fs.lu.MetadataBackend().All(ctx, nodePath)
if err != nil {
log.Error().Err(err).Str("trashRoot", trashRoot).Str("item", itemPath).Str("node_path", nodePath).Msg("could not get extended attributes, skipping")
continue
}
nodeType := fs.lu.TypeFromPath(ctx, nodePath)
if nodeType == provider.ResourceType_RESOURCE_TYPE_INVALID {
log.Error().Err(err).Str("trashRoot", trashRoot).Str("item", itemPath).Str("node_path", nodePath).Msg("invalid node type, skipping")
continue
}
item := &provider.RecycleItem{
Type: nodeType,
Size: uint64(md.Size()),
Key: nodeID,
}
if deletionTime, err := time.Parse(time.RFC3339Nano, timeSuffix); err == nil {
item.DeletionTime = &types.Timestamp{
Seconds: uint64(deletionTime.Unix()),
// TODO nanos
}
} else {
log.Error().Err(err).Str("trashRoot", trashRoot).Str("item", itemPath).Str("node", nodeID).Str("dtime", timeSuffix).Msg("could not parse time format, ignoring")
}
// lookup origin path in extended attributes
if attr, ok := attrs[prefixes.TrashOriginAttr]; ok {
item.Ref = &provider.Reference{Path: string(attr)}
} else {
log.Error().Str("trashRoot", trashRoot).Str("item", itemPath).Str("node", nodeID).Str("dtime", timeSuffix).Msg("could not read origin path")
}
select {
case results <- item:
case <-ctx.Done():
return ctx.Err()
}
}
}
return nil
})
}
// Wait for things to settle down, then close results chan
go func() {
_ = g.Wait() // error is checked later
close(results)
}()
// Collect results
items := []*provider.RecycleItem{}
for ri := range results {
items = append(items, ri)
}
return items, nil
}

View File

@@ -20,10 +20,10 @@ package decomposedfs
import (
"context"
"fmt"
"os"
"path/filepath"
"regexp"
"strconv"
"strings"
"time"
@@ -96,14 +96,14 @@ func (fs *Decomposedfs) Upload(ctx context.Context, req storage.UploadRequest, u
},
Path: utils.MakeRelativePath(filepath.Join(info.MetaData["dir"], info.MetaData["filename"])),
}
owner, ok := ctxpkg.ContextGetUser(uploadInfo.Ctx)
executant, ok := ctxpkg.ContextGetUser(uploadInfo.Ctx)
if !ok {
return provider.ResourceInfo{}, errtypes.PreconditionFailed("error getting user from uploadinfo context")
}
spaceOwner := &userpb.UserId{
OpaqueId: info.Storage["SpaceOwnerOrManager"],
}
uff(spaceOwner, owner.Id, uploadRef)
uff(spaceOwner, executant.Id, uploadRef)
}
ri := provider.ResourceInfo{
@@ -243,36 +243,42 @@ func (fs *Decomposedfs) GetUpload(ctx context.Context, id string) (tusd.Upload,
return upload.Get(ctx, id, fs.lu, fs.tp, fs.o.Root, fs.stream, fs.o.AsyncFileUploads, fs.o.Tokens)
}
// ListUploads returns a list of all incomplete uploads
func (fs *Decomposedfs) ListUploads() ([]tusd.FileInfo, error) {
return fs.uploadInfos(context.Background())
}
// PurgeExpiredUploads scans the fs for expired downloads and removes any leftovers
func (fs *Decomposedfs) PurgeExpiredUploads(purgedChan chan<- tusd.FileInfo) error {
infos, err := fs.uploadInfos(context.Background())
if err != nil {
return err
}
for _, info := range infos {
expires, err := strconv.Atoi(info.MetaData["expires"])
// ListUploadSessions returns the upload sessions for the given filter
func (fs *Decomposedfs) ListUploadSessions(ctx context.Context, filter storage.UploadSessionFilter) ([]storage.UploadSession, error) {
var sessions []storage.UploadSession
if filter.ID != nil && *filter.ID != "" {
session, err := fs.getUploadSession(ctx, filepath.Join(fs.o.Root, "uploads", *filter.ID+".info"))
if err != nil {
return nil, err
}
sessions = []storage.UploadSession{session}
} else {
var err error
sessions, err = fs.uploadSessions(ctx)
if err != nil {
return nil, err
}
}
filteredSessions := []storage.UploadSession{}
now := time.Now()
for _, session := range sessions {
if filter.Processing != nil && *filter.Processing != session.IsProcessing() {
continue
}
if int64(expires) < time.Now().Unix() {
purgedChan <- info
err = os.Remove(info.Storage["BinPath"])
if err != nil {
return err
}
err = os.Remove(filepath.Join(fs.o.Root, "uploads", info.ID+".info"))
if err != nil {
return err
if filter.Expired != nil {
if *filter.Expired {
if now.Before(session.Expires()) {
continue
}
} else {
if now.After(session.Expires()) {
continue
}
}
}
filteredSessions = append(filteredSessions, session)
}
return nil
return filteredSessions, nil
}
// AsTerminatableUpload returns a TerminatableUpload
@@ -296,28 +302,47 @@ func (fs *Decomposedfs) AsConcatableUpload(up tusd.Upload) tusd.ConcatableUpload
return up.(*upload.Upload)
}
func (fs *Decomposedfs) uploadInfos(ctx context.Context) ([]tusd.FileInfo, error) {
infos := []tusd.FileInfo{}
func (fs *Decomposedfs) uploadSessions(ctx context.Context) ([]storage.UploadSession, error) {
uploads := []storage.UploadSession{}
infoFiles, err := filepath.Glob(filepath.Join(fs.o.Root, "uploads", "*.info"))
if err != nil {
return nil, err
}
for _, info := range infoFiles {
match := _idRegexp.FindStringSubmatch(info)
if match == nil || len(match) < 2 {
progress, err := fs.getUploadSession(ctx, info)
if err != nil {
appctx.GetLogger(ctx).Error().Interface("path", info).Msg("Decomposedfs: could not getUploadSession")
continue
}
up, err := fs.GetUpload(ctx, match[1])
if err != nil {
return nil, err
}
info, err := up.GetInfo(context.Background())
if err != nil {
return nil, err
}
infos = append(infos, info)
uploads = append(uploads, progress)
}
return infos, nil
return uploads, nil
}
func (fs *Decomposedfs) getUploadSession(ctx context.Context, path string) (storage.UploadSession, error) {
match := _idRegexp.FindStringSubmatch(path)
if match == nil || len(match) < 2 {
return nil, fmt.Errorf("invalid upload path")
}
up, err := fs.GetUpload(ctx, match[1])
if err != nil {
return nil, err
}
info, err := up.GetInfo(context.Background())
if err != nil {
return nil, err
}
// upload processing state is stored in the node, for decomposedfs the NodeId is always set by InitiateUpload
n, err := node.ReadNode(ctx, fs.lu, info.Storage["SpaceRoot"], info.Storage["NodeId"], true, nil, true)
if err != nil {
return nil, err
}
progress := upload.Progress{
Path: path,
Info: info,
Processing: n.IsProcessing(ctx),
}
return progress, nil
}

View File

@@ -21,6 +21,7 @@ package upload
import (
"context"
"encoding/json"
stderrors "errors"
"fmt"
iofs "io/fs"
"os"
@@ -497,3 +498,85 @@ func lookupNode(ctx context.Context, spaceRoot *node.Node, path string, lu *look
}
return n, nil
}
// Progress adapts the persisted upload metadata for the UploadSessionLister interface
type Progress struct {
Path string
Info tusd.FileInfo
Processing bool
}
// ID implements the storage.UploadSession interface
func (p Progress) ID() string {
return p.Info.ID
}
// Filename implements the storage.UploadSession interface
func (p Progress) Filename() string {
return p.Info.MetaData["filename"]
}
// Size implements the storage.UploadSession interface
func (p Progress) Size() int64 {
return p.Info.Size
}
// Offset implements the storage.UploadSession interface
func (p Progress) Offset() int64 {
return p.Info.Offset
}
// Reference implements the storage.UploadSession interface
func (p Progress) Reference() provider.Reference {
return provider.Reference{
ResourceId: &provider.ResourceId{
StorageId: p.Info.MetaData["providerID"],
SpaceId: p.Info.Storage["SpaceRoot"],
OpaqueId: p.Info.Storage["NodeId"], // Node id is always set in InitiateUpload
},
}
}
// Executant implements the storage.UploadSession interface
func (p Progress) Executant() userpb.UserId {
return userpb.UserId{
Idp: p.Info.Storage["Idp"],
OpaqueId: p.Info.Storage["UserId"],
Type: utils.UserTypeMap(p.Info.Storage["UserType"]),
}
}
// SpaceOwner implements the storage.UploadSession interface
func (p Progress) SpaceOwner() *userpb.UserId {
return &userpb.UserId{
// idp and type do not seem to be consumed and the node currently only stores the user id anyway
OpaqueId: p.Info.Storage["SpaceOwnerOrManager"],
}
}
// Expires implements the storage.UploadSession interface
func (p Progress) Expires() time.Time {
mt, _ := utils.MTimeToTime(p.Info.MetaData["expires"])
return mt
}
// IsProcessing implements the storage.UploadSession interface
func (p Progress) IsProcessing() bool {
return p.Processing
}
// Purge implements the storage.UploadSession interface
func (p Progress) Purge(ctx context.Context) error {
berr := os.Remove(p.Info.Storage["BinPath"])
if berr != nil {
appctx.GetLogger(ctx).Error().Str("id", p.Info.ID).Interface("path", p.Info.Storage["BinPath"]).Msg("Decomposedfs: could not purge bin path for upload session")
}
// remove upload metadata
merr := os.Remove(p.Path)
if merr != nil {
appctx.GetLogger(ctx).Error().Str("id", p.Info.ID).Interface("path", p.Path).Msg("Decomposedfs: could not purge metadata path for upload session")
}
return stderrors.Join(berr, merr)
}

2
vendor/modules.txt vendored
View File

@@ -357,7 +357,7 @@ github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1
github.com/cs3org/go-cs3apis/cs3/storage/registry/v1beta1
github.com/cs3org/go-cs3apis/cs3/tx/v1beta1
github.com/cs3org/go-cs3apis/cs3/types/v1beta1
# github.com/cs3org/reva/v2 v2.16.1-0.20231128104331-ea8d1336afc9
# github.com/cs3org/reva/v2 v2.16.1-0.20231201122033-a389ddc645c4
## explicit; go 1.20
github.com/cs3org/reva/v2/cmd/revad/internal/grace
github.com/cs3org/reva/v2/cmd/revad/runtime