bump reva to use UploadSessionLister interface

Signed-off-by: Jörn Friedrich Dreyer <jfd@butonic.de>
This commit is contained in:
Jörn Friedrich Dreyer
2023-11-30 17:27:42 +01:00
parent 0e8394e83a
commit 286382241a
9 changed files with 276 additions and 108 deletions

2
go.mod
View File

@@ -13,7 +13,7 @@ require (
github.com/coreos/go-oidc v2.2.1+incompatible
github.com/coreos/go-oidc/v3 v3.6.0
github.com/cs3org/go-cs3apis v0.0.0-20230516150832-730ac860c71d
github.com/cs3org/reva/v2 v2.16.4-0.20231211121647-b269a07b70b2
github.com/cs3org/reva/v2 v2.16.4-0.20231212083844-00de22ceb749
github.com/disintegration/imaging v1.6.2
github.com/dutchcoders/go-clamd v0.0.0-20170520113014-b970184f4d9e
github.com/egirna/icap-client v0.1.1

4
go.sum
View File

@@ -864,8 +864,8 @@ github.com/crewjam/httperr v0.2.0 h1:b2BfXR8U3AlIHwNeFFvZ+BV1LFvKLlzMjzaTnZMybNo
github.com/crewjam/httperr v0.2.0/go.mod h1:Jlz+Sg/XqBQhyMjdDiC+GNNRzZTD7x39Gu3pglZ5oH4=
github.com/crewjam/saml v0.4.13 h1:TYHggH/hwP7eArqiXSJUvtOPNzQDyQ7vwmwEqlFWhMc=
github.com/crewjam/saml v0.4.13/go.mod h1:igEejV+fihTIlHXYP8zOec3V5A8y3lws5bQBFsTm4gA=
github.com/cs3org/reva/v2 v2.16.4-0.20231211121647-b269a07b70b2 h1:I6+bI04Kh0MoTSi/EnfkHqdr1HetFTxV3Sph5RIgTNg=
github.com/cs3org/reva/v2 v2.16.4-0.20231211121647-b269a07b70b2/go.mod h1:RvhuweTFqzezjUFU0SIdTXakrEx9vJlMvQ7znPXSP1g=
github.com/cs3org/reva/v2 v2.16.4-0.20231212083844-00de22ceb749 h1:oCktbSObMu5VTGwcux3tlabQwQT+e0CycuheKvzbQow=
github.com/cs3org/reva/v2 v2.16.4-0.20231212083844-00de22ceb749/go.mod h1:RvhuweTFqzezjUFU0SIdTXakrEx9vJlMvQ7znPXSP1g=
github.com/cyberdelia/templates v0.0.0-20141128023046-ca7fffd4298c/go.mod h1:GyV+0YP4qX0UQ7r2MoYZ+AvYDp12OF5yg4q8rGnyNh4=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=

View File

@@ -3,11 +3,8 @@ package command
import (
"fmt"
"os"
"strconv"
"sync"
"time"
tusd "github.com/tus/tusd/pkg/handler"
"github.com/urfave/cli/v2"
"github.com/cs3org/reva/v2/pkg/storage"
@@ -51,20 +48,21 @@ func ListUploads(cfg *config.Config) *cli.Command {
return err
}
managingFS, ok := fs.(storage.UploadsManager)
managingFS, ok := fs.(storage.UploadSessionLister)
if !ok {
fmt.Fprintf(os.Stderr, "'%s' storage does not support listing expired uploads\n", cfg.Driver)
os.Exit(1)
}
uploads, err := managingFS.ListUploads()
falseValue := false
uploads, err := managingFS.ListUploadSessions(c.Context, storage.UploadSessionFilter{Expired: &falseValue})
if err != nil {
return err
}
fmt.Println("Incomplete uploads:")
for _, u := range uploads {
fmt.Printf(" - %s (%s, Size: %d, Expires: %s)\n", u.ID, u.MetaData["filename"], u.Size, expiredString(u.MetaData["expires"]))
ref := u.Reference()
fmt.Printf(" - %s (Space: %s, Name: %s, Size: %d/%d, Expires: %s, Processing: %t)\n", ref.GetResourceId().GetSpaceId(), u.ID(), u.Filename(), u.Offset(), u.Size(), u.Expires(), u.IsProcessing())
}
return nil
},
@@ -92,7 +90,7 @@ func PurgeExpiredUploads(cfg *config.Config) *cli.Command {
return err
}
managingFS, ok := fs.(storage.UploadsManager)
managingFS, ok := fs.(storage.UploadSessionLister)
if !ok {
fmt.Fprintf(os.Stderr, "'%s' storage does not support clean expired uploads\n", cfg.Driver)
os.Exit(1)
@@ -100,18 +98,23 @@ func PurgeExpiredUploads(cfg *config.Config) *cli.Command {
wg := sync.WaitGroup{}
wg.Add(1)
purgedChannel := make(chan tusd.FileInfo)
falseValue := false
trueValue := false
uploads, err := managingFS.ListUploadSessions(c.Context, storage.UploadSessionFilter{Expired: &trueValue, Processing: &falseValue})
if err != nil {
return err
}
fmt.Println("Cleaned uploads:")
fmt.Println("purging uploads:")
go func() {
for purged := range purgedChannel {
fmt.Printf(" - %s (%s, Size: %d, Expires: %s)\n", purged.ID, purged.MetaData["filename"], purged.Size, expiredString(purged.MetaData["expires"]))
for _, u := range uploads {
ref := u.Reference()
fmt.Printf(" - %s (Space: %s, Name: %s, Size: %d/%d, Expires: %s, Processing: %t)\n", ref.GetResourceId().GetSpaceId(), u.ID(), u.Filename(), u.Offset(), u.Size(), u.Expires(), u.IsProcessing())
u.Purge(c.Context)
}
wg.Done()
}()
err = managingFS.PurgeExpiredUploads(purgedChannel)
close(purgedChannel)
wg.Wait()
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to clean expired uploads '%s'\n", err)
@@ -121,12 +124,3 @@ func PurgeExpiredUploads(cfg *config.Config) *cli.Command {
},
}
}
func expiredString(e string) string {
expired := "N/A"
iExpires, err := strconv.Atoi(e)
if err == nil {
expired = time.Unix(int64(iExpires), 0).Format(time.RFC3339)
}
return expired
}

View File

@@ -23,13 +23,11 @@ import (
"log"
"net/http"
"path"
"path/filepath"
"time"
"github.com/pkg/errors"
tusd "github.com/tus/tusd/pkg/handler"
userv1beta1 "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"github.com/cs3org/reva/v2/internal/http/services/owncloud/ocdav/net"
"github.com/cs3org/reva/v2/pkg/appctx"
@@ -40,8 +38,8 @@ import (
"github.com/cs3org/reva/v2/pkg/rhttp/datatx/metrics"
"github.com/cs3org/reva/v2/pkg/storage"
"github.com/cs3org/reva/v2/pkg/storage/cache"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/upload"
"github.com/cs3org/reva/v2/pkg/storagespace"
"github.com/cs3org/reva/v2/pkg/utils"
"github.com/mitchellh/mapstructure"
)
@@ -103,33 +101,27 @@ func (m *manager) Handler(fs storage.FS) (http.Handler, error) {
return nil, err
}
go func() {
for {
ev := <-handler.CompleteUploads
info := ev.Upload
spaceOwner := &userv1beta1.UserId{
OpaqueId: info.Storage["SpaceOwnerOrManager"],
}
owner := &userv1beta1.UserId{
Idp: info.Storage["Idp"],
OpaqueId: info.Storage["UserId"],
}
ref := &provider.Reference{
ResourceId: &provider.ResourceId{
StorageId: info.MetaData["providerID"],
SpaceId: info.Storage["SpaceRoot"],
OpaqueId: info.Storage["SpaceRoot"],
},
Path: utils.MakeRelativePath(filepath.Join(info.MetaData["dir"], info.MetaData["filename"])),
}
datatx.InvalidateCache(owner, ref, m.statCache)
if m.publisher != nil {
if err := datatx.EmitFileUploadedEvent(spaceOwner, owner, ref, m.publisher); err != nil {
appctx.GetLogger(context.Background()).Error().Err(err).Msg("failed to publish FileUploaded event")
if _, ok := fs.(storage.UploadSessionLister); ok {
// We can currently only send updates if the fs is decomposedfs as we read very specific keys from the storage map of the tus info
go func() {
for {
ev := <-handler.CompleteUploads
// We should be able to get the upload progress with fs.GetUploadProgress, but currently tus will erase the info files
// so we create a Progress instance here that is used to read the correct properties
up := upload.Progress{
Info: ev.Upload,
}
executant := up.Executant()
ref := up.Reference()
datatx.InvalidateCache(&executant, &ref, m.statCache)
if m.publisher != nil {
if err := datatx.EmitFileUploadedEvent(up.SpaceOwner(), &executant, &ref, m.publisher); err != nil {
appctx.GetLogger(context.Background()).Error().Err(err).Msg("failed to publish FileUploaded event")
}
}
}
}
}()
}()
}
h := handler.Middleware(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
method := r.Method

View File

@@ -23,16 +23,10 @@ import (
"io"
"net/url"
tusd "github.com/tus/tusd/pkg/handler"
userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
registry "github.com/cs3org/go-cs3apis/cs3/storage/registry/v1beta1"
)
// UploadFinishedFunc is a callback function used in storage drivers to indicate that an upload has finished
type UploadFinishedFunc func(spaceOwner, owner *userpb.UserId, ref *provider.Reference)
// FS is the interface to implement access to the storage.
type FS interface {
GetHome(ctx context.Context) (string, error)
@@ -77,12 +71,6 @@ type FS interface {
DeleteStorageSpace(ctx context.Context, req *provider.DeleteStorageSpaceRequest) error
}
// UploadsManager defines the interface for FS implementations that allow for managing uploads
type UploadsManager interface {
ListUploads() ([]tusd.FileInfo, error)
PurgeExpiredUploads(chan<- tusd.FileInfo) error
}
// Registry is the interface that storage registries implement
// for discovering storage providers
type Registry interface {

View File

@@ -0,0 +1,86 @@
// Copyright 2018-2021 CERN
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// In applying this license, CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.
package storage
import (
"context"
"io"
"time"
userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
tusd "github.com/tus/tusd/pkg/handler"
)
// UploadFinishedFunc is a callback function used in storage drivers to indicate that an upload has finished
type UploadFinishedFunc func(spaceOwner, executant *userpb.UserId, ref *provider.Reference)
// UploadRequest us used in FS.Upload() to carry required upload metadata
type UploadRequest struct {
Ref *provider.Reference
Body io.ReadCloser
Length int64
}
// UploadsManager defines the interface for storage drivers that allow for managing uploads
// Deprecated: No longer used. Storage drivers should implement the UploadSessionLister.
type UploadsManager interface {
ListUploads() ([]tusd.FileInfo, error)
PurgeExpiredUploads(chan<- tusd.FileInfo) error
}
// UploadSessionLister defines the interface for FS implementations that allow listing and purging upload sessions
type UploadSessionLister interface {
// ListUploadSessions returns the upload sessions matching the given filter
ListUploadSessions(ctx context.Context, filter UploadSessionFilter) ([]UploadSession, error)
}
// UploadSession is the interface that storage drivers need to return whan listing upload sessions.
type UploadSession interface {
// ID returns the upload id
ID() string
// Filename returns the filename of the file
Filename() string
// Size returns the size of the upload
Size() int64
// Offset returns the current offset
Offset() int64
// Reference returns a reference for the file being uploaded. May be absolute id based or relative to e.g. a space root
Reference() provider.Reference
// Executant returns the userid of the user that created the upload
Executant() userpb.UserId
// SpaceOwner returns the owner of a space if set. optional
SpaceOwner() *userpb.UserId
// Expires returns the time when the upload can no longer be used
Expires() time.Time
// IsProcessing returns true if postprocessing has not finished, yet
// The actual postprocessing state is tracked in the postprocessing service.
IsProcessing() bool
// Purge allows completely removing an upload. Should emit a PostprocessingFinished event with a Delete outcome
Purge(ctx context.Context) error
}
// UploadSessionFilter can be used to filter upload sessions
type UploadSessionFilter struct {
ID *string
Processing *bool
Expired *bool
}

View File

@@ -20,11 +20,11 @@ package decomposedfs
import (
"context"
"fmt"
"io"
"os"
"path/filepath"
"regexp"
"strconv"
"strings"
"time"
@@ -97,14 +97,14 @@ func (fs *Decomposedfs) Upload(ctx context.Context, ref *provider.Reference, r i
},
Path: utils.MakeRelativePath(filepath.Join(info.MetaData["dir"], info.MetaData["filename"])),
}
owner, ok := ctxpkg.ContextGetUser(uploadInfo.Ctx)
executant, ok := ctxpkg.ContextGetUser(uploadInfo.Ctx)
if !ok {
return provider.ResourceInfo{}, errtypes.PreconditionFailed("error getting user from uploadinfo context")
}
spaceOwner := &userpb.UserId{
OpaqueId: info.Storage["SpaceOwnerOrManager"],
}
uff(spaceOwner, owner.Id, uploadRef)
uff(spaceOwner, executant.Id, uploadRef)
}
ri := provider.ResourceInfo{
@@ -244,36 +244,42 @@ func (fs *Decomposedfs) GetUpload(ctx context.Context, id string) (tusd.Upload,
return upload.Get(ctx, id, fs.lu, fs.tp, fs.o.Root, fs.stream, fs.o.AsyncFileUploads, fs.o.Tokens)
}
// ListUploads returns a list of all incomplete uploads
func (fs *Decomposedfs) ListUploads() ([]tusd.FileInfo, error) {
return fs.uploadInfos(context.Background())
}
// PurgeExpiredUploads scans the fs for expired downloads and removes any leftovers
func (fs *Decomposedfs) PurgeExpiredUploads(purgedChan chan<- tusd.FileInfo) error {
infos, err := fs.uploadInfos(context.Background())
if err != nil {
return err
}
for _, info := range infos {
expires, err := strconv.Atoi(info.MetaData["expires"])
// ListUploadSessions returns the upload sessions for the given filter
func (fs *Decomposedfs) ListUploadSessions(ctx context.Context, filter storage.UploadSessionFilter) ([]storage.UploadSession, error) {
var sessions []storage.UploadSession
if filter.ID != nil && *filter.ID != "" {
session, err := fs.getUploadSession(ctx, filepath.Join(fs.o.Root, "uploads", *filter.ID+".info"))
if err != nil {
return nil, err
}
sessions = []storage.UploadSession{session}
} else {
var err error
sessions, err = fs.uploadSessions(ctx)
if err != nil {
return nil, err
}
}
filteredSessions := []storage.UploadSession{}
now := time.Now()
for _, session := range sessions {
if filter.Processing != nil && *filter.Processing != session.IsProcessing() {
continue
}
if int64(expires) < time.Now().Unix() {
purgedChan <- info
err = os.Remove(info.Storage["BinPath"])
if err != nil {
return err
}
err = os.Remove(filepath.Join(fs.o.Root, "uploads", info.ID+".info"))
if err != nil {
return err
if filter.Expired != nil {
if *filter.Expired {
if now.Before(session.Expires()) {
continue
}
} else {
if now.After(session.Expires()) {
continue
}
}
}
filteredSessions = append(filteredSessions, session)
}
return nil
return filteredSessions, nil
}
// AsTerminatableUpload returns a TerminatableUpload
@@ -297,28 +303,47 @@ func (fs *Decomposedfs) AsConcatableUpload(up tusd.Upload) tusd.ConcatableUpload
return up.(*upload.Upload)
}
func (fs *Decomposedfs) uploadInfos(ctx context.Context) ([]tusd.FileInfo, error) {
infos := []tusd.FileInfo{}
func (fs *Decomposedfs) uploadSessions(ctx context.Context) ([]storage.UploadSession, error) {
uploads := []storage.UploadSession{}
infoFiles, err := filepath.Glob(filepath.Join(fs.o.Root, "uploads", "*.info"))
if err != nil {
return nil, err
}
for _, info := range infoFiles {
match := _idRegexp.FindStringSubmatch(info)
if match == nil || len(match) < 2 {
progress, err := fs.getUploadSession(ctx, info)
if err != nil {
appctx.GetLogger(ctx).Error().Interface("path", info).Msg("Decomposedfs: could not getUploadSession")
continue
}
up, err := fs.GetUpload(ctx, match[1])
if err != nil {
return nil, err
}
info, err := up.GetInfo(context.Background())
if err != nil {
return nil, err
}
infos = append(infos, info)
uploads = append(uploads, progress)
}
return infos, nil
return uploads, nil
}
func (fs *Decomposedfs) getUploadSession(ctx context.Context, path string) (storage.UploadSession, error) {
match := _idRegexp.FindStringSubmatch(path)
if match == nil || len(match) < 2 {
return nil, fmt.Errorf("invalid upload path")
}
up, err := fs.GetUpload(ctx, match[1])
if err != nil {
return nil, err
}
info, err := up.GetInfo(context.Background())
if err != nil {
return nil, err
}
// upload processing state is stored in the node, for decomposedfs the NodeId is always set by InitiateUpload
n, err := node.ReadNode(ctx, fs.lu, info.Storage["SpaceRoot"], info.Storage["NodeId"], true, nil, true)
if err != nil {
return nil, err
}
progress := upload.Progress{
Path: path,
Info: info,
Processing: n.IsProcessing(ctx),
}
return progress, nil
}

View File

@@ -21,6 +21,7 @@ package upload
import (
"context"
"encoding/json"
stderrors "errors"
"fmt"
iofs "io/fs"
"os"
@@ -493,3 +494,85 @@ func lookupNode(ctx context.Context, spaceRoot *node.Node, path string, lu *look
}
return n, nil
}
// Progress adapts the persisted upload metadata for the UploadSessionLister interface
type Progress struct {
Path string
Info tusd.FileInfo
Processing bool
}
// ID implements the storage.UploadSession interface
func (p Progress) ID() string {
return p.Info.ID
}
// Filename implements the storage.UploadSession interface
func (p Progress) Filename() string {
return p.Info.MetaData["filename"]
}
// Size implements the storage.UploadSession interface
func (p Progress) Size() int64 {
return p.Info.Size
}
// Offset implements the storage.UploadSession interface
func (p Progress) Offset() int64 {
return p.Info.Offset
}
// Reference implements the storage.UploadSession interface
func (p Progress) Reference() provider.Reference {
return provider.Reference{
ResourceId: &provider.ResourceId{
StorageId: p.Info.MetaData["providerID"],
SpaceId: p.Info.Storage["SpaceRoot"],
OpaqueId: p.Info.Storage["NodeId"], // Node id is always set in InitiateUpload
},
}
}
// Executant implements the storage.UploadSession interface
func (p Progress) Executant() userpb.UserId {
return userpb.UserId{
Idp: p.Info.Storage["Idp"],
OpaqueId: p.Info.Storage["UserId"],
Type: utils.UserTypeMap(p.Info.Storage["UserType"]),
}
}
// SpaceOwner implements the storage.UploadSession interface
func (p Progress) SpaceOwner() *userpb.UserId {
return &userpb.UserId{
// idp and type do not seem to be consumed and the node currently only stores the user id anyway
OpaqueId: p.Info.Storage["SpaceOwnerOrManager"],
}
}
// Expires implements the storage.UploadSession interface
func (p Progress) Expires() time.Time {
mt, _ := utils.MTimeToTime(p.Info.MetaData["expires"])
return mt
}
// IsProcessing implements the storage.UploadSession interface
func (p Progress) IsProcessing() bool {
return p.Processing
}
// Purge implements the storage.UploadSession interface
func (p Progress) Purge(ctx context.Context) error {
berr := os.Remove(p.Info.Storage["BinPath"])
if berr != nil {
appctx.GetLogger(ctx).Error().Str("id", p.Info.ID).Interface("path", p.Info.Storage["BinPath"]).Msg("Decomposedfs: could not purge bin path for upload session")
}
// remove upload metadata
merr := os.Remove(p.Path)
if merr != nil {
appctx.GetLogger(ctx).Error().Str("id", p.Info.ID).Interface("path", p.Path).Msg("Decomposedfs: could not purge metadata path for upload session")
}
return stderrors.Join(berr, merr)
}

2
vendor/modules.txt vendored
View File

@@ -354,7 +354,7 @@ github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1
github.com/cs3org/go-cs3apis/cs3/storage/registry/v1beta1
github.com/cs3org/go-cs3apis/cs3/tx/v1beta1
github.com/cs3org/go-cs3apis/cs3/types/v1beta1
# github.com/cs3org/reva/v2 v2.16.4-0.20231211121647-b269a07b70b2
# github.com/cs3org/reva/v2 v2.16.4-0.20231212083844-00de22ceb749
## explicit; go 1.20
github.com/cs3org/reva/v2/cmd/revad/internal/grace
github.com/cs3org/reva/v2/cmd/revad/runtime