Bump reva

This commit is contained in:
André Duffeck
2025-03-12 15:54:06 +01:00
parent 747df432d0
commit b6db5f7677
80 changed files with 1507 additions and 2685 deletions

View File

@@ -53,7 +53,7 @@ func New(root string) (*Blobstore, error) {
}
// Upload stores some data in the blobstore under the given key
func (bs *Blobstore) Upload(node *node.Node, source string) error {
func (bs *Blobstore) Upload(node *node.Node, source, _copyTarget string) error {
if node.BlobID == "" {
return ErrBlobIDEmpty
}

View File

@@ -77,7 +77,7 @@ func New(endpoint, region, bucket, accessKey, secretKey string, defaultPutOption
}
// Upload stores some data in the blobstore under the given key
func (bs *Blobstore) Upload(node *node.Node, source string) error {
func (bs *Blobstore) Upload(node *node.Node, source, _copyTarget string) error {
reader, err := os.Open(source)
if err != nil {
return errors.Wrap(err, "can not open source file to upload")

View File

@@ -81,7 +81,11 @@ func (bs *Blobstore) Upload(node *node.Node, source, copyTarget string) error {
return err
}
file.Seek(0, 0)
_, err = file.Seek(0, 0)
if err != nil {
return err
}
copyFile, err := os.OpenFile(copyTarget, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0600)
if err != nil {
return errors.Wrapf(err, "could not open copy target '%s' for writing", copyTarget)

View File

@@ -59,6 +59,11 @@ func New(m map[string]interface{}) (*Options, error) {
m["metadata_backend"] = "hybrid"
}
// debounced scan delay
if o.ScanDebounceDelay == 0 {
o.ScanDebounceDelay = 10 * time.Millisecond
}
do, err := decomposedoptions.New(m)
if err != nil {
return nil, err

View File

@@ -78,7 +78,7 @@ func New(m map[string]interface{}, stream events.Stream, log *zerolog.Logger) (s
var lu *lookup.Lookup
switch o.MetadataBackend {
case "xattrs":
lu = lookup.New(metadata.NewXattrsBackend(o.Root, o.FileMetadataCache), um, o, &timemanager.Manager{})
lu = lookup.New(metadata.NewXattrsBackend(o.FileMetadataCache), um, o, &timemanager.Manager{})
case "hybrid":
lu = lookup.New(metadata.NewHybridBackend(1024, // start offloading grants after 1KB
func(n metadata.MetadataNode) string {

View File

@@ -259,56 +259,71 @@ func (tb *Trashbin) ListRecycle(ctx context.Context, spaceID string, key, relati
}
// RestoreRecycleItem restores the specified item
func (tb *Trashbin) RestoreRecycleItem(ctx context.Context, spaceID string, key, relativePath string, restoreRef *provider.Reference) error {
func (tb *Trashbin) RestoreRecycleItem(ctx context.Context, spaceID string, key, relativePath string, restoreRef *provider.Reference) (*node.Node, error) {
_, span := tracer.Start(ctx, "RestoreRecycleItem")
defer span.End()
trashRoot := filepath.Join(tb.lu.InternalPath(spaceID, spaceID), ".Trash")
trashPath := filepath.Clean(filepath.Join(trashRoot, "files", key+".trashitem", relativePath))
restorePath := ""
// TODO why can we not use NodeFromResource here? It will use walk path. Do trashed items have a problem with that?
restoreBaseNode, err := tb.lu.NodeFromID(ctx, restoreRef.GetResourceId())
if err != nil {
return err
if restoreRef != nil {
restoreBaseNode, err := tb.lu.NodeFromID(ctx, restoreRef.GetResourceId())
if err != nil {
return nil, err
}
restorePath = filepath.Join(restoreBaseNode.InternalPath(), restoreRef.GetPath())
} else {
originalPath, _, err := tb.readInfoFile(trashRoot, key)
if err != nil {
return nil, err
}
restorePath = filepath.Join(tb.lu.InternalPath(spaceID, spaceID), originalPath, relativePath)
}
restorePath := filepath.Join(restoreBaseNode.InternalPath(), restoreRef.GetPath())
// TODO the decomposed trash also checks the permissions on the restore node
_, id, _, err := tb.lu.MetadataBackend().IdentifyPath(ctx, trashPath)
if err != nil {
return err
return nil, err
}
// update parent id in case it was restored to a different location
_, parentID, _, err := tb.lu.MetadataBackend().IdentifyPath(ctx, filepath.Dir(restorePath))
if err != nil {
return err
return nil, err
}
if len(parentID) == 0 {
return fmt.Errorf("trashbin: parent id not found for %s", restorePath)
return nil, fmt.Errorf("trashbin: parent id not found for %s", restorePath)
}
trashNode := &trashNode{spaceID: spaceID, id: id, path: trashPath}
err = tb.lu.MetadataBackend().Set(ctx, trashNode, prefixes.ParentidAttr, []byte(parentID))
if err != nil {
return err
return nil, err
}
// restore the item
err = os.Rename(trashPath, restorePath)
if err != nil {
return err
return nil, err
}
if err := tb.lu.CacheID(ctx, spaceID, string(id), restorePath); err != nil {
tb.log.Error().Err(err).Str("spaceID", spaceID).Str("id", string(id)).Str("path", restorePath).Msg("trashbin: error caching id")
if err := tb.lu.CacheID(ctx, spaceID, id, restorePath); err != nil {
tb.log.Error().Err(err).Str("spaceID", spaceID).Str("id", id).Str("path", restorePath).Msg("trashbin: error caching id")
}
restoredNode, err := tb.lu.NodeFromID(ctx, &provider.ResourceId{SpaceId: spaceID, OpaqueId: id})
if err != nil {
return nil, err
}
// cleanup trash info
if relativePath == "." || relativePath == "/" {
return os.Remove(filepath.Join(trashRoot, "info", key+".trashinfo"))
return restoredNode, os.Remove(filepath.Join(trashRoot, "info", key+".trashinfo"))
} else {
return nil
return restoredNode, nil
}
}
// PurgeRecycleItem purges the specified item, all its children and all their revisions

View File

@@ -197,7 +197,7 @@ func (t *Tree) Scan(path string, action EventAction, isDir bool) error {
})
}
if err := t.setDirty(filepath.Dir(path), true); err != nil {
return err
t.log.Error().Err(err).Str("path", path).Bool("isDir", isDir).Msg("failed to mark directory as dirty")
}
t.scanDebouncer.Debounce(scanItem{
Path: filepath.Dir(path),
@@ -208,7 +208,7 @@ func (t *Tree) Scan(path string, action EventAction, isDir bool) error {
// 2. New directory
// -> scan directory
if err := t.setDirty(path, true); err != nil {
return err
t.log.Error().Err(err).Str("path", path).Bool("isDir", isDir).Msg("failed to mark directory as dirty")
}
t.scanDebouncer.Debounce(scanItem{
Path: path,
@@ -244,8 +244,13 @@ func (t *Tree) Scan(path string, action EventAction, isDir bool) error {
t.log.Debug().Str("path", path).Bool("isDir", isDir).Msg("scanning path (ActionMoveFrom)")
// 6. file/directory moved out of the watched directory
// -> update directory
if err := t.setDirty(filepath.Dir(path), true); err != nil {
return err
err := t.HandleFileDelete(path)
if err != nil {
t.log.Error().Err(err).Str("path", path).Bool("isDir", isDir).Msg("failed to handle deleted item")
}
err = t.setDirty(filepath.Dir(path), true)
if err != nil {
t.log.Error().Err(err).Str("path", path).Bool("isDir", isDir).Msg("failed to mark directory as dirty")
}
go func() { _ = t.WarmupIDCache(filepath.Dir(path), false, true) }()
@@ -258,7 +263,7 @@ func (t *Tree) Scan(path string, action EventAction, isDir bool) error {
err := t.HandleFileDelete(path)
if err != nil {
return err
t.log.Error().Err(err).Str("path", path).Bool("isDir", isDir).Msg("failed to handle deleted item")
}
t.scanDebouncer.Debounce(scanItem{
@@ -342,7 +347,7 @@ func (t *Tree) findSpaceId(path string) (string, node.Attributes, error) {
}
}
return string(spaceID), spaceAttrs, nil
return spaceID, spaceAttrs, nil
}
spaceCandidate = filepath.Dir(spaceCandidate)
}
@@ -387,7 +392,7 @@ func (t *Tree) assimilate(item scanItem) error {
// the file has an id set, we already know it from the past
n := node.NewBaseNode(spaceID, id, t.lookup)
previousPath, ok := t.lookup.GetCachedID(context.Background(), spaceID, string(id))
previousPath, ok := t.lookup.GetCachedID(context.Background(), spaceID, id)
previousParentID, _ := t.lookup.MetadataBackend().Get(context.Background(), n, prefixes.ParentidAttr)
// compare metadata mtime with actual mtime. if it matches AND the path hasn't changed (move operation)
@@ -418,10 +423,10 @@ func (t *Tree) assimilate(item scanItem) error {
// this is a move
t.log.Debug().Str("path", item.Path).Msg("move detected")
if err := t.lookup.CacheID(context.Background(), spaceID, string(id), item.Path); err != nil {
t.log.Error().Err(err).Str("spaceID", spaceID).Str("id", string(id)).Str("path", item.Path).Msg("could not cache id")
if err := t.lookup.CacheID(context.Background(), spaceID, id, item.Path); err != nil {
t.log.Error().Err(err).Str("spaceID", spaceID).Str("id", id).Str("path", item.Path).Msg("could not cache id")
}
_, attrs, err := t.updateFile(item.Path, string(id), spaceID)
_, attrs, err := t.updateFile(item.Path, id, spaceID)
if err != nil {
return err
}
@@ -471,11 +476,11 @@ func (t *Tree) assimilate(item scanItem) error {
} else {
// This item had already been assimilated in the past. Update the path
t.log.Debug().Str("path", item.Path).Msg("updating cached path")
if err := t.lookup.CacheID(context.Background(), spaceID, string(id), item.Path); err != nil {
t.log.Error().Err(err).Str("spaceID", spaceID).Str("id", string(id)).Str("path", item.Path).Msg("could not cache id")
if err := t.lookup.CacheID(context.Background(), spaceID, id, item.Path); err != nil {
t.log.Error().Err(err).Str("spaceID", spaceID).Str("id", id).Str("path", item.Path).Msg("could not cache id")
}
_, _, err := t.updateFile(item.Path, string(id), spaceID)
_, _, err := t.updateFile(item.Path, id, spaceID)
if err != nil {
return err
}
@@ -753,7 +758,7 @@ func (t *Tree) WarmupIDCache(root string, assimilate, onlyDirty bool) error {
}
spaceID, _, _, err = t.lookup.MetadataBackend().IdentifyPath(context.Background(), spaceCandidate)
if err == nil {
if err == nil && len(spaceID) > 0 {
err = scopeSpace(path)
if err != nil {
return err

View File

@@ -223,44 +223,6 @@ func (tp *Tree) DownloadRevision(ctx context.Context, ref *provider.Reference, r
return ri, reader, nil
}
func (tp *Tree) getRevisionNode(ctx context.Context, ref *provider.Reference, revisionKey string, hasPermission func(*provider.ResourcePermissions) bool) (*node.Node, error) {
_, span := tracer.Start(ctx, "getRevisionNode")
defer span.End()
log := appctx.GetLogger(ctx)
// verify revision key format
kp := strings.SplitN(revisionKey, node.RevisionIDDelimiter, 2)
if len(kp) != 2 {
log.Error().Str("revisionKey", revisionKey).Msg("malformed revisionKey")
return nil, errtypes.NotFound(revisionKey)
}
log.Debug().Str("revisionKey", revisionKey).Msg("DownloadRevision")
spaceID := ref.ResourceId.SpaceId
// check if the node is available and has not been deleted
n, err := node.ReadNode(ctx, tp.lookup, spaceID, kp[0], false, nil, false)
if err != nil {
return nil, err
}
if !n.Exists {
err = errtypes.NotFound(filepath.Join(n.ParentID, n.Name))
return nil, err
}
p, err := tp.permissions.AssemblePermissions(ctx, n)
switch {
case err != nil:
return nil, err
case !hasPermission(p):
return nil, errtypes.PermissionDenied(filepath.Join(n.ParentID, n.Name))
}
// Set space owner in context
storagespace.ContextSendSpaceOwnerID(ctx, n.SpaceOwnerOrManager(ctx))
return n, nil
}
func (tp *Tree) RestoreRevision(ctx context.Context, srcNode, targetNode metadata.MetadataNode) error {
source := srcNode.InternalPath()
target := targetNode.InternalPath()
@@ -275,7 +237,10 @@ func (tp *Tree) RestoreRevision(ctx context.Context, srcNode, targetNode metadat
return err
}
defer wf.Close()
wf.Truncate(0)
err = wf.Truncate(0)
if err != nil {
return err
}
if _, err := io.Copy(wf, rf); err != nil {
return err
@@ -293,7 +258,11 @@ func (tp *Tree) RestoreRevision(ctx context.Context, srcNode, targetNode metadat
// always set the node mtime to the current time
mtime := time.Now()
os.Chtimes(target, mtime, mtime)
err = os.Chtimes(target, mtime, mtime)
if err != nil {
return errtypes.InternalError("failed to update times:" + err.Error())
}
err = tp.lookup.MetadataBackend().SetMultiple(ctx, targetNode,
map[string][]byte{
prefixes.MTimeAttr: []byte(mtime.UTC().Format(time.RFC3339Nano)),

View File

@@ -26,7 +26,6 @@ import (
"io/fs"
"os"
"path/filepath"
"regexp"
"strings"
"time"
@@ -62,13 +61,6 @@ func init() {
tracer = otel.Tracer("github.com/cs3org/reva/pkg/storage/pkg/decomposedfs/tree")
}
// Blobstore defines an interface for storing blobs in a blobstore
type Blobstore interface {
Upload(node *node.Node, source, copyTarget string) error
Download(node *node.Node) (io.ReadCloser, error)
Delete(node *node.Node) error
}
type Watcher interface {
Watch(path string)
}
@@ -82,7 +74,7 @@ type scanItem struct {
// Tree manages a hierarchical tree
type Tree struct {
lookup *lookup.Lookup
blobstore Blobstore
blobstore node.Blobstore
trashbin *trashbin.Trashbin
propagator propagator.Propagator
permissions permissions.Permissions
@@ -103,7 +95,7 @@ type Tree struct {
type PermissionCheckFunc func(rp *provider.ResourcePermissions) bool
// New returns a new instance of Tree
func New(lu node.PathLookup, bs Blobstore, um usermapper.Mapper, trashbin *trashbin.Trashbin, permissions permissions.Permissions, o *options.Options, es events.Stream, cache store.Store, log *zerolog.Logger) (*Tree, error) {
func New(lu node.PathLookup, bs node.Blobstore, um usermapper.Mapper, trashbin *trashbin.Trashbin, permissions permissions.Permissions, o *options.Options, es events.Stream, cache store.Store, log *zerolog.Logger) (*Tree, error) {
scanQueue := make(chan scanItem)
t := &Tree{
lookup: lu.(*lookup.Lookup),
@@ -242,7 +234,10 @@ func (t *Tree) TouchFile(ctx context.Context, n *node.Node, markprocessing bool,
if err != nil {
return err
}
t.lookup.TimeManager().OverrideMtime(ctx, n, &attributes, nodeMTime)
err = t.lookup.TimeManager().OverrideMtime(ctx, n, &attributes, nodeMTime)
if err != nil {
return err
}
} else {
fi, err := f.Stat()
if err != nil {
@@ -572,13 +567,13 @@ func (t *Tree) DeleteBlob(node *node.Node) error {
}
// BuildSpaceIDIndexEntry returns the entry for the space id index
func (t *Tree) BuildSpaceIDIndexEntry(spaceID, nodeID string) string {
return nodeID
func (t *Tree) BuildSpaceIDIndexEntry(spaceID string) string {
return spaceID
}
// ResolveSpaceIDIndexEntry returns the node id for the space id index entry
func (t *Tree) ResolveSpaceIDIndexEntry(spaceid, entry string) (string, string, error) {
return spaceid, entry, nil
func (t *Tree) ResolveSpaceIDIndexEntry(spaceID string) (string, error) {
return spaceID, nil
}
// InitNewNode initializes a new node
@@ -652,8 +647,6 @@ func (t *Tree) createDirNode(ctx context.Context, n *node.Node) (err error) {
return n.SetXattrsWithContext(ctx, attributes, false)
}
var nodeIDRegep = regexp.MustCompile(`.*/nodes/([^.]*).*`)
func (t *Tree) isIgnored(path string) bool {
return isLockFile(path) || isTrash(path) || t.isUpload(path) || t.isInternal(path)
}

View File

@@ -130,7 +130,7 @@ type Decomposedfs struct {
}
// NewDefault returns an instance with default components
func NewDefault(m map[string]interface{}, bs tree.Blobstore, es events.Stream, log *zerolog.Logger) (storage.FS, error) {
func NewDefault(m map[string]interface{}, bs node.Blobstore, es events.Stream, log *zerolog.Logger) (storage.FS, error) {
if log == nil {
log = &zerolog.Logger{}
}
@@ -143,9 +143,9 @@ func NewDefault(m map[string]interface{}, bs tree.Blobstore, es events.Stream, l
var lu *lookup.Lookup
switch o.MetadataBackend {
case "xattrs":
lu = lookup.New(metadata.NewXattrsBackend(o.Root, o.FileMetadataCache), o, &timemanager.Manager{})
lu = lookup.New(metadata.NewXattrsBackend(o.FileMetadataCache), o, &timemanager.Manager{})
case "messagepack":
lu = lookup.New(metadata.NewMessagePackBackend(o.Root, o.FileMetadataCache), o, &timemanager.Manager{})
lu = lookup.New(metadata.NewMessagePackBackend(o.FileMetadataCache), o, &timemanager.Manager{})
default:
return nil, fmt.Errorf("unknown metadata backend %s, only 'messagepack' or 'xattrs' (default) supported", o.MetadataBackend)
}
@@ -1292,7 +1292,23 @@ func (fs *Decomposedfs) RestoreRecycleItem(ctx context.Context, space *provider.
return errtypes.NotFound(key)
}
return fs.trashbin.RestoreRecycleItem(ctx, spaceID, key, relativePath, restoreRef)
restoredNode, err := fs.trashbin.RestoreRecycleItem(ctx, spaceID, key, relativePath, restoreRef)
if err != nil {
return err
}
var sizeDiff int64
if restoredNode.IsDir(ctx) {
treeSize, err := restoredNode.GetTreeSize(ctx)
if err != nil {
return err
}
sizeDiff = int64(treeSize)
} else {
sizeDiff = restoredNode.Blobsize
}
return fs.tp.Propagate(ctx, restoredNode, sizeDiff)
}
func (fs *Decomposedfs) PurgeRecycleItem(ctx context.Context, space *provider.Reference, key, relativePath string) error {

View File

@@ -40,7 +40,6 @@ import (
// MessagePackBackend persists the attributes in messagepack format inside the file
type MessagePackBackend struct {
rootPath string
metaCache cache.FileMetadataCache
}
@@ -51,9 +50,8 @@ type readWriteCloseSeekTruncater interface {
}
// NewMessagePackBackend returns a new MessagePackBackend instance
func NewMessagePackBackend(rootPath string, o cache.Config) MessagePackBackend {
func NewMessagePackBackend(o cache.Config) MessagePackBackend {
return MessagePackBackend{
rootPath: filepath.Clean(rootPath),
metaCache: cache.GetFileMetadataCache(o),
}
}
@@ -63,7 +61,7 @@ func (MessagePackBackend) Name() string { return "messagepack" }
// IdentifyPath returns the id and mtime of a file
func (b MessagePackBackend) IdentifyPath(_ context.Context, path string) (string, string, time.Time, error) {
metaPath := filepath.Join(path + ".mpk")
metaPath := filepath.Clean(path + ".mpk")
source, err := os.Open(metaPath)
// // No cached entry found. Read from storage and store in cache
if err != nil {

View File

@@ -37,12 +37,11 @@ import (
// XattrsBackend stores the file attributes in extended attributes
type XattrsBackend struct {
rootPath string
metaCache cache.FileMetadataCache
}
// NewMessageBackend returns a new XattrsBackend instance
func NewXattrsBackend(rootPath string, o cache.Config) XattrsBackend {
func NewXattrsBackend(o cache.Config) XattrsBackend {
return XattrsBackend{
metaCache: cache.GetFileMetadataCache(o),
}

View File

@@ -86,6 +86,13 @@ const (
ProcessingStatus = "processing:"
)
// Blobstore defines an interface for storing blobs in a blobstore
type Blobstore interface {
Upload(node *Node, source, copyTarget string) error
Download(node *Node) (io.ReadCloser, error)
Delete(node *Node) error
}
type TimeManager interface {
// OverrideMTime overrides the mtime of the node, either on the node itself or in the given attributes, depending on the implementation
OverrideMtime(ctx context.Context, n *Node, attrs *Attributes, mtime time.Time) error
@@ -129,8 +136,8 @@ type Tree interface {
ReadBlob(node *Node) (io.ReadCloser, error)
DeleteBlob(node *Node) error
BuildSpaceIDIndexEntry(spaceID, nodeID string) string
ResolveSpaceIDIndexEntry(spaceID, entry string) (string, string, error)
BuildSpaceIDIndexEntry(spaceID string) string
ResolveSpaceIDIndexEntry(spaceID string) (string, error)
CreateRevision(ctx context.Context, n *Node, version string, f *lockedfile.File) (string, error)
ListRevisions(ctx context.Context, ref *provider.Reference) ([]*provider.FileVersion, error)

View File

@@ -129,7 +129,7 @@ func (tb *DecomposedfsTrashbin) ListRecycle(ctx context.Context, spaceID string,
}
item := &provider.RecycleItem{
Type: provider.ResourceType(typeInt),
Size: uint64(size),
Size: size,
Key: filepath.Join(key, relativePath),
DeletionTime: deletionTime,
Ref: &provider.Reference{
@@ -345,7 +345,7 @@ func (tb *DecomposedfsTrashbin) listTrashRoot(ctx context.Context, spaceID strin
}
// RestoreRecycleItem restores the specified item
func (tb *DecomposedfsTrashbin) RestoreRecycleItem(ctx context.Context, spaceID string, key, relativePath string, restoreRef *provider.Reference) error {
func (tb *DecomposedfsTrashbin) RestoreRecycleItem(ctx context.Context, spaceID string, key, relativePath string, restoreRef *provider.Reference) (*node.Node, error) {
_, span := tracer.Start(ctx, "RestoreRecycleItem")
defer span.End()
@@ -353,7 +353,7 @@ func (tb *DecomposedfsTrashbin) RestoreRecycleItem(ctx context.Context, spaceID
if restoreRef != nil {
tn, err := tb.fs.lu.NodeFromResource(ctx, restoreRef)
if err != nil {
return err
return nil, err
}
targetNode = tn
@@ -361,19 +361,19 @@ func (tb *DecomposedfsTrashbin) RestoreRecycleItem(ctx context.Context, spaceID
rn, parent, restoreFunc, err := tb.fs.tp.(*tree.Tree).RestoreRecycleItemFunc(ctx, spaceID, key, relativePath, targetNode)
if err != nil {
return err
return nil, err
}
// check permissions of deleted node
rp, err := tb.fs.p.AssembleTrashPermissions(ctx, rn)
switch {
case err != nil:
return err
return nil, err
case !rp.RestoreRecycleItem:
if rp.Stat {
return errtypes.PermissionDenied(key)
return nil, errtypes.PermissionDenied(key)
}
return errtypes.NotFound(key)
return nil, errtypes.NotFound(key)
}
// Set space owner in context
@@ -383,13 +383,13 @@ func (tb *DecomposedfsTrashbin) RestoreRecycleItem(ctx context.Context, spaceID
pp, err := tb.fs.p.AssemblePermissions(ctx, parent)
switch {
case err != nil:
return err
return nil, err
case !pp.InitiateFileUpload:
// share receiver cannot restore to a shared resource to which she does not have write permissions.
if rp.Stat {
return errtypes.PermissionDenied(key)
return nil, errtypes.PermissionDenied(key)
}
return errtypes.NotFound(key)
return nil, errtypes.NotFound(key)
}
// Run the restore func

View File

@@ -30,6 +30,8 @@ import (
"sync/atomic"
"time"
"maps"
userv1beta1 "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
v1beta11 "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
@@ -248,7 +250,7 @@ func (fs *Decomposedfs) ListStorageSpaces(ctx context.Context, filter []*provide
var (
spaceID = spaceIDAny
nodeID = spaceIDAny
entry = spaceIDAny
requestedUserID *userv1beta1.UserId
)
@@ -266,8 +268,8 @@ func (fs *Decomposedfs) ListStorageSpaces(ctx context.Context, filter []*provide
spaceTypes[filter[i].GetSpaceType()] = struct{}{}
}
case provider.ListStorageSpacesRequest_Filter_TYPE_ID:
_, spaceID, nodeID, _ = storagespace.SplitID(filter[i].GetId().OpaqueId)
if strings.Contains(nodeID, "/") {
_, spaceID, entry, _ = storagespace.SplitID(filter[i].GetId().OpaqueId)
if strings.Contains(entry, "/") {
return []*provider.StorageSpace{}, nil
}
case provider.ListStorageSpacesRequest_Filter_TYPE_USER:
@@ -296,11 +298,11 @@ func (fs *Decomposedfs) ListStorageSpaces(ctx context.Context, filter []*provide
// /path/to/root/spaces/personal/nodeid
// /path/to/root/spaces/shared/nodeid
if spaceID != spaceIDAny && nodeID != spaceIDAny {
if spaceID != spaceIDAny && entry != spaceIDAny {
// try directly reading the node
n, err := node.ReadNode(ctx, fs.lu, spaceID, nodeID, true, nil, false) // permission to read disabled space is checked later
n, err := node.ReadNode(ctx, fs.lu, spaceID, entry, true, nil, false) // permission to read disabled space is checked later
if err != nil {
appctx.GetLogger(ctx).Error().Err(err).Str("id", nodeID).Msg("could not read node")
appctx.GetLogger(ctx).Error().Err(err).Str("id", entry).Msg("could not read node")
return nil, err
}
if !n.Exists {
@@ -332,12 +334,10 @@ func (fs *Decomposedfs) ListStorageSpaces(ctx context.Context, filter []*provide
return nil, errors.Wrap(err, "error reading user index")
}
if nodeID == spaceIDAny {
for spaceID, nodeID := range allMatches {
matches[spaceID] = nodeID
}
if entry == spaceIDAny {
maps.Copy(matches, allMatches)
} else {
matches[allMatches[nodeID]] = allMatches[nodeID]
matches[allMatches[entry]] = allMatches[entry]
}
// get Groups for userid
@@ -359,12 +359,10 @@ func (fs *Decomposedfs) ListStorageSpaces(ctx context.Context, filter []*provide
return nil, errors.Wrap(err, "error reading group index")
}
if nodeID == spaceIDAny {
for spaceID, nodeID := range allMatches {
matches[spaceID] = nodeID
}
if entry == spaceIDAny {
maps.Copy(matches, allMatches)
} else {
matches[allMatches[nodeID]] = allMatches[nodeID]
matches[allMatches[entry]] = allMatches[entry]
}
}
@@ -389,12 +387,10 @@ func (fs *Decomposedfs) ListStorageSpaces(ctx context.Context, filter []*provide
return nil, errors.Wrap(err, "error reading type index")
}
if nodeID == spaceIDAny {
for spaceID, nodeID := range allMatches {
matches[spaceID] = nodeID
}
if entry == spaceIDAny {
maps.Copy(matches, allMatches)
} else {
matches[allMatches[nodeID]] = allMatches[nodeID]
matches[allMatches[entry]] = allMatches[entry]
}
}
}
@@ -417,9 +413,9 @@ func (fs *Decomposedfs) ListStorageSpaces(ctx context.Context, filter []*provide
// Distribute work
errg.Go(func() error {
defer close(work)
for spaceID, nodeID := range matches {
for spaceID, entry := range matches {
select {
case work <- []string{spaceID, nodeID}:
case work <- []string{spaceID, entry}:
case <-ctx.Done():
return ctx.Err()
}
@@ -435,15 +431,15 @@ func (fs *Decomposedfs) ListStorageSpaces(ctx context.Context, filter []*provide
for i := 0; i < numWorkers; i++ {
errg.Go(func() error {
for match := range work {
spaceID, nodeID, err := fs.tp.ResolveSpaceIDIndexEntry(match[0], match[1])
spaceID, err := fs.tp.ResolveSpaceIDIndexEntry(match[1])
if err != nil {
appctx.GetLogger(ctx).Error().Err(err).Str("id", nodeID).Msg("resolve space id index entry, skipping")
appctx.GetLogger(ctx).Error().Err(err).Str("id", spaceID).Msg("resolve space id index entry, skipping")
continue
}
n, err := node.ReadNode(ctx, fs.lu, spaceID, nodeID, true, nil, true)
n, err := node.ReadNode(ctx, fs.lu, spaceID, spaceID, true, nil, true)
if err != nil {
appctx.GetLogger(ctx).Error().Err(err).Str("id", nodeID).Msg("could not read node, skipping")
appctx.GetLogger(ctx).Error().Err(err).Str("id", spaceID).Msg("could not read node, skipping")
continue
}
@@ -459,7 +455,7 @@ func (fs *Decomposedfs) ListStorageSpaces(ctx context.Context, filter []*provide
case errtypes.NotFound:
// ok
default:
appctx.GetLogger(ctx).Error().Err(err).Str("id", nodeID).Msg("could not convert to storage space")
appctx.GetLogger(ctx).Error().Err(err).Str("id", spaceID).Msg("could not convert to storage space")
}
continue
}
@@ -497,9 +493,9 @@ func (fs *Decomposedfs) ListStorageSpaces(ctx context.Context, filter []*provide
}
// if there are no matches (or they happened to be spaces for the owner) and the node is a child return a space
if int64(len(matches)) <= numShares.Load() && nodeID != spaceID {
if int64(len(matches)) <= numShares.Load() && entry != spaceID {
// try node id
n, err := node.ReadNode(ctx, fs.lu, spaceID, nodeID, true, nil, false) // permission to read disabled space is checked in storageSpaceFromNode
n, err := node.ReadNode(ctx, fs.lu, spaceID, entry, true, nil, false) // permission to read disabled space is checked in storageSpaceFromNode
if err != nil {
return nil, err
}
@@ -817,7 +813,7 @@ func (fs *Decomposedfs) DeleteStorageSpace(ctx context.Context, req *provider.De
// - for decomposedfs/decomposeds3 it is the relative link to the space root
// - for the posixfs it is the node id
func (fs *Decomposedfs) updateIndexes(ctx context.Context, grantee *provider.Grantee, spaceType, spaceID, nodeID string) error {
target := fs.tp.BuildSpaceIDIndexEntry(spaceID, nodeID)
target := fs.tp.BuildSpaceIDIndexEntry(spaceID)
err := fs.linkStorageSpaceType(ctx, spaceType, spaceID, target)
if err != nil {
return err

View File

@@ -23,13 +23,14 @@ import (
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"github.com/opencloud-eu/reva/v2/pkg/storage"
"github.com/opencloud-eu/reva/v2/pkg/storage/pkg/decomposedfs/node"
)
type Trashbin interface {
Setup(storage.FS) error
ListRecycle(ctx context.Context, spaceID, key, relativePath string) ([]*provider.RecycleItem, error)
RestoreRecycleItem(ctx context.Context, spaceID, key, relativePath string, restoreRef *provider.Reference) error
RestoreRecycleItem(ctx context.Context, spaceID, key, relativePath string, restoreRef *provider.Reference) (*node.Node, error)
PurgeRecycleItem(ctx context.Context, spaceID, key, relativePath string) error
EmptyRecycle(ctx context.Context, spaceID string) error
}

View File

@@ -56,17 +56,10 @@ func init() {
tracer = otel.Tracer("github.com/cs3org/reva/pkg/storage/utils/decomposedfs/tree")
}
// Blobstore defines an interface for storing blobs in a blobstore
type Blobstore interface {
Upload(node *node.Node, source string) error
Download(node *node.Node) (io.ReadCloser, error)
Delete(node *node.Node) error
}
// Tree manages a hierarchical tree
type Tree struct {
lookup node.PathLookup
blobstore Blobstore
blobstore node.Blobstore
propagator propagator.Propagator
permissions permissions.Permissions
@@ -79,7 +72,7 @@ type Tree struct {
type PermissionCheckFunc func(rp *provider.ResourcePermissions) bool
// New returns a new instance of Tree
func New(lu node.PathLookup, bs Blobstore, o *options.Options, p permissions.Permissions, cache store.Store, log *zerolog.Logger) *Tree {
func New(lu node.PathLookup, bs node.Blobstore, o *options.Options, p permissions.Permissions, cache store.Store, log *zerolog.Logger) *Tree {
return &Tree{
lookup: lu,
blobstore: bs,
@@ -524,7 +517,7 @@ func (t *Tree) Delete(ctx context.Context, n *node.Node) (err error) {
}
// RestoreRecycleItemFunc returns a node and a function to restore it from the trash.
func (t *Tree) RestoreRecycleItemFunc(ctx context.Context, spaceid, key, trashPath string, targetNode *node.Node) (*node.Node, *node.Node, func() error, error) {
func (t *Tree) RestoreRecycleItemFunc(ctx context.Context, spaceid, key, trashPath string, targetNode *node.Node) (*node.Node, *node.Node, func() (*node.Node, error), error) {
_, span := tracer.Start(ctx, "RestoreRecycleItemFunc")
defer span.End()
logger := appctx.GetLogger(ctx)
@@ -555,9 +548,9 @@ func (t *Tree) RestoreRecycleItemFunc(ctx context.Context, spaceid, key, trashPa
return nil, nil, nil, err
}
fn := func() error {
fn := func() (*node.Node, error) {
if targetNode.Exists {
return errtypes.AlreadyExists("origin already exists")
return nil, errtypes.AlreadyExists("origin already exists")
}
parts := strings.SplitN(recycleNode.ID, node.TrashIDDelimiter, 2)
@@ -567,18 +560,18 @@ func (t *Tree) RestoreRecycleItemFunc(ctx context.Context, spaceid, key, trashPa
// add the entry for the parent dir
err = os.Symlink("../../../../../"+lookup.Pathify(originalId, 4, 2), filepath.Join(targetNode.ParentPath(), targetNode.Name))
if err != nil {
return err
return nil, err
}
// attempt to rename only if we're not in a subfolder
if recycleNode.ID != restoreNode.ID {
err = os.Rename(recycleNode.InternalPath(), restoreNode.InternalPath())
if err != nil {
return err
return nil, err
}
err = t.lookup.MetadataBackend().Rename(recycleNode, restoreNode)
if err != nil {
return err
return nil, err
}
}
@@ -590,7 +583,7 @@ func (t *Tree) RestoreRecycleItemFunc(ctx context.Context, spaceid, key, trashPa
attrs.SetString(prefixes.ParentidAttr, targetNode.ParentID)
if err = t.lookup.MetadataBackend().SetMultiple(ctx, restoreNode, map[string][]byte(attrs), true); err != nil {
return errors.Wrap(err, "Decomposedfs: could not update recycle node")
return nil, errors.Wrap(err, "Decomposedfs: could not update recycle node")
}
// delete item link in trash
@@ -598,7 +591,7 @@ func (t *Tree) RestoreRecycleItemFunc(ctx context.Context, spaceid, key, trashPa
if trashPath != "" && trashPath != "/" {
resolvedTrashRoot, err := filepath.EvalSymlinks(trashItem)
if err != nil {
return errors.Wrap(err, "Decomposedfs: could not resolve trash root")
return nil, errors.Wrap(err, "Decomposedfs: could not resolve trash root")
}
deletePath = filepath.Join(resolvedTrashRoot, trashPath)
if err = os.Remove(deletePath); err != nil {
@@ -609,18 +602,11 @@ func (t *Tree) RestoreRecycleItemFunc(ctx context.Context, spaceid, key, trashPa
logger.Error().Err(err).Str("trashItem", trashItem).Str("deletePath", deletePath).Str("trashPath", trashPath).Msg("error recursively deleting trash item")
}
}
var sizeDiff int64
if recycleNode.IsDir(ctx) {
treeSize, err := recycleNode.GetTreeSize(ctx)
if err != nil {
return err
}
sizeDiff = int64(treeSize)
} else {
sizeDiff = recycleNode.Blobsize
}
return t.Propagate(ctx, targetNode, sizeDiff)
rn := node.New(restoreNode.SpaceID, restoreNode.ID, targetNode.ParentID, targetNode.Name, recycleNode.Blobsize, recycleNode.BlobID, recycleNode.Type(ctx), recycleNode.Owner(), t.lookup)
rn.SpaceRoot = targetNode.SpaceRoot
rn.Exists = true
// the recycle node has an id with the trish timestamp, but the propagation is only interested in the parent id
return rn, nil
}
return recycleNode, parent, fn, nil
}
@@ -801,7 +787,7 @@ func (t *Tree) Propagate(ctx context.Context, n *node.Node, sizeDiff int64) (err
// WriteBlob writes a blob to the blobstore
func (t *Tree) WriteBlob(node *node.Node, source string) error {
return t.blobstore.Upload(node, source)
return t.blobstore.Upload(node, source, "")
}
// ReadBlob reads a blob from the blobstore
@@ -826,13 +812,14 @@ func (t *Tree) DeleteBlob(node *node.Node) error {
}
// BuildSpaceIDIndexEntry returns the entry for the space id index
func (t *Tree) BuildSpaceIDIndexEntry(spaceID, nodeID string) string {
func (t *Tree) BuildSpaceIDIndexEntry(spaceID string) string {
return "../../../spaces/" + lookup.Pathify(spaceID, 1, 2) + "/nodes/" + lookup.Pathify(spaceID, 4, 2)
}
// ResolveSpaceIDIndexEntry returns the node id for the space id index entry
func (t *Tree) ResolveSpaceIDIndexEntry(_, entry string) (string, string, error) {
return ReadSpaceAndNodeFromIndexLink(entry)
func (t *Tree) ResolveSpaceIDIndexEntry(entry string) (string, error) {
spaceID, _, err := ReadSpaceAndNodeFromIndexLink(entry)
return spaceID, err
}
// ReadSpaceAndNodeFromIndexLink reads a symlink and parses space and node id if the link has the correct format, eg:

View File

@@ -125,7 +125,7 @@ func (store DecomposedFsStore) List(ctx context.Context) ([]*DecomposedFsSession
func (store DecomposedFsStore) Get(ctx context.Context, id string) (*DecomposedFsSession, error) {
sessionPath := sessionPath(store.root, id)
match := _idRegexp.FindStringSubmatch(sessionPath)
if match == nil || len(match) < 2 {
if len(match) < 2 {
return nil, fmt.Errorf("invalid upload path")
}

View File

@@ -125,7 +125,7 @@ func (store OcisStore) List(ctx context.Context) ([]*OcisSession, error) {
func (store OcisStore) Get(ctx context.Context, id string) (*OcisSession, error) {
sessionPath := sessionPath(store.root, id)
match := _idRegexp.FindStringSubmatch(sessionPath)
if match == nil || len(match) < 2 {
if len(match) < 2 {
return nil, fmt.Errorf("invalid upload path")
}

View File

@@ -16,7 +16,7 @@
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.
// Code generated by mockery v2.40.2. DO NOT EDIT.
// Code generated by mockery v2.53.2. DO NOT EDIT.
package mocks

View File

@@ -1,4 +1,22 @@
// Code generated by mockery v2.46.3. DO NOT EDIT.
// Copyright 2018-2022 CERN
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// In applying this license, CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.
// Code generated by mockery v2.53.2. DO NOT EDIT.
package mocks

View File

@@ -14,6 +14,8 @@ type StoreComposer struct {
Concater ConcaterDataStore
UsesLengthDeferrer bool
LengthDeferrer LengthDeferrerDataStore
ContentServer ContentServerDataStore
UsesContentServer bool
}
// NewStoreComposer creates a new and empty store composer.
@@ -85,3 +87,8 @@ func (store *StoreComposer) UseLengthDeferrer(ext LengthDeferrerDataStore) {
store.UsesLengthDeferrer = ext != nil
store.LengthDeferrer = ext
}
func (store *StoreComposer) UseContentServer(ext ContentServerDataStore) {
store.UsesContentServer = ext != nil
store.ContentServer = ext
}

View File

@@ -75,6 +75,13 @@ type Config struct {
// If the error is non-nil, the error will be forwarded to the client. Furthermore,
// HTTPResponse will be ignored and the error value can contain values for the HTTP response.
PreFinishResponseCallback func(hook HookEvent) (HTTPResponse, error)
// PreUploadTerminateCallback will be invoked on DELETE requests before an upload is terminated,
// giving the application the opportunity to reject the termination. For example, to ensure resources
// used by other services are not deleted.
// If the callback returns no error, optional values from HTTPResponse will be contained in the HTTP response.
// If the error is non-nil, the error will be forwarded to the client. Furthermore,
// HTTPResponse will be ignored and the error value can contain values for the HTTP response.
PreUploadTerminateCallback func(hook HookEvent) (HTTPResponse, error)
// GracefulRequestCompletionTimeout is the timeout for operations to complete after an HTTP
// request has ended (successfully or by error). For example, if an HTTP request is interrupted,
// instead of stopping immediately, the handler and data store will be given some additional

View File

@@ -3,6 +3,7 @@ package handler
import (
"context"
"io"
"net/http"
)
type MetaData map[string]string
@@ -191,3 +192,21 @@ type Lock interface {
// Unlock releases an existing lock for the given upload.
Unlock() error
}
type ServableUpload interface {
// ServeContent serves the uploaded data as specified by the GET request.
// It allows data stores to delegate the handling of range requests and conditional
// requests to their underlying providers.
// The tusd handler will set the Content-Type and Content-Disposition headers
// before calling ServeContent, but the implementation can override them.
// After calling ServeContent, the handler will not take any further action
// other than handling a potential error.
ServeContent(ctx context.Context, w http.ResponseWriter, r *http.Request) error
}
// ContentServerDataStore is the interface for DataStores that can serve content directly.
// When the handler serves a GET request, it will pass the request to ServeContent
// and delegate its handling to the DataStore, instead of using GetReader to obtain the content.
type ContentServerDataStore interface {
AsServableUpload(upload Upload) ServableUpload
}

View File

@@ -60,6 +60,7 @@ var (
ErrInvalidUploadDeferLength = NewError("ERR_INVALID_UPLOAD_LENGTH_DEFER", "invalid Upload-Defer-Length header", http.StatusBadRequest)
ErrUploadStoppedByServer = NewError("ERR_UPLOAD_STOPPED", "upload has been stopped by server", http.StatusBadRequest)
ErrUploadRejectedByServer = NewError("ERR_UPLOAD_REJECTED", "upload creation has been rejected by server", http.StatusBadRequest)
ErrUploadTerminationRejected = NewError("ERR_UPLOAD_TERMINATION_REJECTED", "upload termination has been rejected by server", http.StatusBadRequest)
ErrUploadInterrupted = NewError("ERR_UPLOAD_INTERRUPTED", "upload has been interrupted by another request for this upload resource", http.StatusBadRequest)
ErrServerShutdown = NewError("ERR_SERVER_SHUTDOWN", "request has been interrupted because the server is shutting down", http.StatusServiceUnavailable)
ErrOriginNotAllowed = NewError("ERR_ORIGIN_NOT_ALLOWED", "request origin is not allowed", http.StatusForbidden)
@@ -177,10 +178,10 @@ func (handler *UnroutedHandler) Middleware(h http.Handler) http.Handler {
// We also update the write deadline, but makes sure that it is larger than the read deadline, so we
// can still write a response in the case of a read timeout.
if err := c.resC.SetReadDeadline(time.Now().Add(handler.config.NetworkTimeout)); err != nil {
c.log.Warn("NetworkControlError", "error", err)
c.log.WarnContext(c, "NetworkControlError", "error", err)
}
if err := c.resC.SetWriteDeadline(time.Now().Add(2 * handler.config.NetworkTimeout)); err != nil {
c.log.Warn("NetworkControlError", "error", err)
c.log.WarnContext(c, "NetworkControlError", "error", err)
}
// Allow overriding the HTTP method. The reason for this is
@@ -190,7 +191,7 @@ func (handler *UnroutedHandler) Middleware(h http.Handler) http.Handler {
r.Method = newMethod
}
c.log.Info("RequestIncoming")
c.log.InfoContext(c, "RequestIncoming")
handler.Metrics.incRequestsTotal(r.Method)
@@ -405,7 +406,7 @@ func (handler *UnroutedHandler) PostFile(w http.ResponseWriter, r *http.Request)
handler.Metrics.incUploadsCreated()
c.log = c.log.With("id", id)
c.log.Info("UploadCreated", "size", size, "url", url)
c.log.InfoContext(c, "UploadCreated", "size", size, "url", url)
if handler.config.NotifyCreatedUploads {
handler.CreatedUploads <- newHookEvent(c, info)
@@ -572,7 +573,7 @@ func (handler *UnroutedHandler) PostFileV2(w http.ResponseWriter, r *http.Reques
handler.Metrics.incUploadsCreated()
c.log = c.log.With("id", id)
c.log.Info("UploadCreated", "size", info.Size, "url", url)
c.log.InfoContext(c, "UploadCreated", "size", info.Size, "url", url)
if handler.config.NotifyCreatedUploads {
handler.CreatedUploads <- newHookEvent(c, info)
@@ -891,7 +892,7 @@ func (handler *UnroutedHandler) writeChunk(c *httpContext, resp HTTPResponse, up
maxSize = length
}
c.log.Info("ChunkWriteStart", "maxSize", maxSize, "offset", offset)
c.log.InfoContext(c, "ChunkWriteStart", "maxSize", maxSize, "offset", offset)
var bytesWritten int64
var err error
@@ -907,12 +908,12 @@ func (handler *UnroutedHandler) writeChunk(c *httpContext, resp HTTPResponse, up
// Update the read deadline for every successful read operation. This ensures that the request handler
// keeps going while data is transmitted but that dead connections can also time out and be cleaned up.
if err := c.resC.SetReadDeadline(time.Now().Add(handler.config.NetworkTimeout)); err != nil {
c.log.Warn("NetworkTimeoutError", "error", err)
c.log.WarnContext(c, "NetworkTimeoutError", "error", err)
}
// The write deadline is updated accordingly to ensure that we can also write responses.
if err := c.resC.SetWriteDeadline(time.Now().Add(2 * handler.config.NetworkTimeout)); err != nil {
c.log.Warn("NetworkTimeoutError", "error", err)
c.log.WarnContext(c, "NetworkTimeoutError", "error", err)
}
}
@@ -935,7 +936,7 @@ func (handler *UnroutedHandler) writeChunk(c *httpContext, resp HTTPResponse, up
// it in the response, if the store did not also return an error.
bodyErr := c.body.hasError()
if bodyErr != nil {
c.log.Error("BodyReadError", "error", bodyErr.Error())
c.log.ErrorContext(c, "BodyReadError", "error", bodyErr.Error())
if err == nil {
err = bodyErr
}
@@ -947,12 +948,12 @@ func (handler *UnroutedHandler) writeChunk(c *httpContext, resp HTTPResponse, up
if terminateErr := handler.terminateUpload(c, upload, info); terminateErr != nil {
// We only log this error and not show it to the user since this
// termination error is not relevant to the uploading client
c.log.Error("UploadStopTerminateError", "error", terminateErr.Error())
c.log.ErrorContext(c, "UploadStopTerminateError", "error", terminateErr.Error())
}
}
}
c.log.Info("ChunkWriteComplete", "bytesWritten", bytesWritten)
c.log.InfoContext(c, "ChunkWriteComplete", "bytesWritten", bytesWritten)
// Send new offset to client
newOffset := offset + bytesWritten
@@ -1003,7 +1004,7 @@ func (handler *UnroutedHandler) emitFinishEvents(c *httpContext, resp HTTPRespon
resp = resp.MergeWith(resp2)
}
c.log.Info("UploadFinished", "size", info.Size)
c.log.InfoContext(c, "UploadFinished", "size", info.Size)
handler.Metrics.incUploadsFinished()
if handler.config.NotifyCompleteUploads {
@@ -1047,6 +1048,7 @@ func (handler *UnroutedHandler) GetFile(w http.ResponseWriter, r *http.Request)
return
}
// Fall back to the existing GetReader implementation if ContentServerDataStore is not implemented
contentType, contentDisposition := filterContentType(info)
resp := HTTPResponse{
StatusCode: http.StatusOK,
@@ -1058,6 +1060,27 @@ func (handler *UnroutedHandler) GetFile(w http.ResponseWriter, r *http.Request)
Body: "", // Body is intentionally left empty, and we copy it manually in later.
}
// If the data store implements ContentServerDataStore, use delegate the handling
// of GET requests to the data store.
// Otherwise, we will use the existing GetReader implementation.
if handler.composer.UsesContentServer {
servableUpload := handler.composer.ContentServer.AsServableUpload(upload)
// Pass file type and name to the implementation, but it may override them.
w.Header().Set("Content-Type", resp.Header["Content-Type"])
w.Header().Set("Content-Disposition", resp.Header["Content-Disposition"])
// Use loggingResponseWriter to get the ResponseOutgoing log entry that
// normally handler.sendResp would produce.
loggingW := &loggingResponseWriter{ResponseWriter: w, logger: c.log}
err = servableUpload.ServeContent(c, loggingW, r)
if err != nil {
handler.sendError(c, err)
}
return
}
// If no data has been uploaded yet, respond with an empty "204 No Content" status.
if info.Offset == 0 {
resp.StatusCode = http.StatusNoContent
@@ -1065,6 +1088,15 @@ func (handler *UnroutedHandler) GetFile(w http.ResponseWriter, r *http.Request)
return
}
if handler.composer.UsesContentServer {
servableUpload := handler.composer.ContentServer.AsServableUpload(upload)
err = servableUpload.ServeContent(c, w, r)
if err != nil {
handler.sendError(c, err)
}
return
}
src, err := upload.GetReader(c)
if err != nil {
handler.sendError(c, err)
@@ -1172,7 +1204,7 @@ func (handler *UnroutedHandler) DelFile(w http.ResponseWriter, r *http.Request)
}
var info FileInfo
if handler.config.NotifyTerminatedUploads {
if handler.config.NotifyTerminatedUploads || handler.config.PreUploadTerminateCallback != nil {
info, err = upload.GetInfo(c)
if err != nil {
handler.sendError(c, err)
@@ -1180,15 +1212,26 @@ func (handler *UnroutedHandler) DelFile(w http.ResponseWriter, r *http.Request)
}
}
resp := HTTPResponse{
StatusCode: http.StatusNoContent,
}
if handler.config.PreUploadTerminateCallback != nil {
resp2, err := handler.config.PreUploadTerminateCallback(newHookEvent(c, info))
if err != nil {
handler.sendError(c, err)
return
}
resp = resp.MergeWith(resp2)
}
err = handler.terminateUpload(c, upload, info)
if err != nil {
handler.sendError(c, err)
return
}
handler.sendResp(c, HTTPResponse{
StatusCode: http.StatusNoContent,
})
handler.sendResp(c, resp)
}
// terminateUpload passes a given upload to the DataStore's Terminater,
@@ -1208,7 +1251,7 @@ func (handler *UnroutedHandler) terminateUpload(c *httpContext, upload Upload, i
handler.TerminatedUploads <- newHookEvent(c, info)
}
c.log.Info("UploadTerminated")
c.log.InfoContext(c, "UploadTerminated")
handler.Metrics.incUploadsTerminated()
return nil
@@ -1222,7 +1265,7 @@ func (handler *UnroutedHandler) sendError(c *httpContext, err error) {
var detailedErr Error
if !errors.As(err, &detailedErr) {
c.log.Error("InternalServerError", "message", err.Error())
c.log.ErrorContext(c, "InternalServerError", "message", err.Error())
detailedErr = NewError("ERR_INTERNAL_SERVER_ERROR", err.Error(), http.StatusInternalServerError)
}
@@ -1240,7 +1283,7 @@ func (handler *UnroutedHandler) sendError(c *httpContext, err error) {
func (handler *UnroutedHandler) sendResp(c *httpContext, resp HTTPResponse) {
resp.writeTo(c.res)
c.log.Info("ResponseOutgoing", "status", resp.StatusCode, "body", resp.Body)
c.log.InfoContext(c, "ResponseOutgoing", "status", resp.StatusCode, "body", resp.Body)
}
// Make an absolute URLs to the given upload id. If the base path is absolute
@@ -1323,6 +1366,14 @@ func getHostAndProtocol(r *http.Request, allowForwarded bool) (host, proto strin
}
}
// Remove default ports
if proto == "http" {
host = strings.TrimSuffix(host, ":80")
}
if proto == "https" {
host = strings.TrimSuffix(host, ":443")
}
return
}
@@ -1393,7 +1444,7 @@ func (handler *UnroutedHandler) lockUpload(c *httpContext, id string) (Lock, err
// No need to wrap this in a sync.OnceFunc because c.cancel will be a noop after the first call.
releaseLock := func() {
c.log.Info("UploadInterrupted")
c.log.InfoContext(c, "UploadInterrupted")
c.cancel(ErrUploadInterrupted)
}
@@ -1671,3 +1722,20 @@ func validateUploadId(newId string) error {
return nil
}
// loggingResponseWriter is a wrapper around http.ResponseWriter that logs the
// final status code similar to UnroutedHandler.sendResp.
type loggingResponseWriter struct {
http.ResponseWriter
logger *slog.Logger
}
func (w *loggingResponseWriter) WriteHeader(statusCode int) {
if statusCode >= 200 {
w.logger.Info("ResponseOutgoing", "status", statusCode)
}
w.ResponseWriter.WriteHeader(statusCode)
}
// Unwrap provides access to the underlying http.ResponseWriter.
func (w *loggingResponseWriter) Unwrap() http.ResponseWriter { return w.ResponseWriter }