Merge pull request #10712 from aduffeck/cleanup-stale-shares

Cleanup stale shares
This commit is contained in:
Jörn Friedrich Dreyer
2024-12-03 15:10:08 +01:00
committed by GitHub
32 changed files with 597 additions and 60 deletions

2
go.mod
View File

@@ -17,7 +17,7 @@ require (
github.com/cenkalti/backoff v2.2.1+incompatible
github.com/coreos/go-oidc/v3 v3.11.0
github.com/cs3org/go-cs3apis v0.0.0-20241105092511-3ad35d174fc1
github.com/cs3org/reva/v2 v2.26.7
github.com/cs3org/reva/v2 v2.26.8-0.20241203081301-17f339546533
github.com/davidbyttow/govips/v2 v2.15.0
github.com/dhowden/tag v0.0.0-20240417053706-3d75831295e8
github.com/dutchcoders/go-clamd v0.0.0-20170520113014-b970184f4d9e

2
go.sum
View File

@@ -257,6 +257,8 @@ github.com/cs3org/go-cs3apis v0.0.0-20241105092511-3ad35d174fc1 h1:RU6LT6mkD16xZ
github.com/cs3org/go-cs3apis v0.0.0-20241105092511-3ad35d174fc1/go.mod h1:DedpcqXl193qF/08Y04IO0PpxyyMu8+GrkD6kWK2MEQ=
github.com/cs3org/reva/v2 v2.26.7 h1:E5b1+H5ZsnmDgWWS/u3t4PtdmiMaY1bEEYVI/vE9xo8=
github.com/cs3org/reva/v2 v2.26.7/go.mod h1:xC5N2XOrCRim/W55uyMsew8RwwFZbQ4hIaKshIbyToo=
github.com/cs3org/reva/v2 v2.26.8-0.20241203081301-17f339546533 h1:QshDjljk44ASolJwlHxE9e7u+Slgdi/VfPKYvbfFu2g=
github.com/cs3org/reva/v2 v2.26.8-0.20241203081301-17f339546533/go.mod h1:fJWmn7EkttWOWphZfiKdFOcHuthcUsU55aSN1VeTOhU=
github.com/cyberdelia/templates v0.0.0-20141128023046-ca7fffd4298c/go.mod h1:GyV+0YP4qX0UQ7r2MoYZ+AvYDp12OF5yg4q8rGnyNh4=
github.com/cyphar/filepath-securejoin v0.2.4 h1:Ugdm7cg7i6ZK6x3xDF1oEu1nfkyfH53EtKeQYTC3kyg=
github.com/cyphar/filepath-securejoin v0.2.4/go.mod h1:aPGpWjXOXUn2NCNjFvBE6aRxGGx79pTxQpKOJNYHHl4=

View File

@@ -519,7 +519,7 @@ func revaShareConfig(cfg *sharing.Config) map[string]interface{} {
"machine_auth_apikey": cfg.UserSharingDrivers.CS3.SystemUserAPIKey,
},
"jsoncs3": map[string]interface{}{
"gateway_addr": cfg.UserSharingDrivers.JSONCS3.ProviderAddr,
"gateway_addr": cfg.Reva.Address,
"provider_addr": cfg.UserSharingDrivers.JSONCS3.ProviderAddr,
"service_user_id": cfg.UserSharingDrivers.JSONCS3.SystemUserID,
"service_user_idp": cfg.UserSharingDrivers.JSONCS3.SystemUserIDP,

128
ocis/pkg/command/shares.go Normal file
View File

@@ -0,0 +1,128 @@
package command
import (
"errors"
"github.com/rs/zerolog"
"github.com/urfave/cli/v2"
"github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool"
"github.com/cs3org/reva/v2/pkg/share/manager/jsoncs3"
"github.com/cs3org/reva/v2/pkg/share/manager/registry"
"github.com/cs3org/reva/v2/pkg/utils"
"github.com/owncloud/ocis/v2/ocis-pkg/config"
"github.com/owncloud/ocis/v2/ocis-pkg/config/configlog"
"github.com/owncloud/ocis/v2/ocis-pkg/config/parser"
mregistry "github.com/owncloud/ocis/v2/ocis-pkg/registry"
"github.com/owncloud/ocis/v2/ocis/pkg/register"
sharingparser "github.com/owncloud/ocis/v2/services/sharing/pkg/config/parser"
)
// SharesCommand is the entrypoint for the groups command.
func SharesCommand(cfg *config.Config) *cli.Command {
return &cli.Command{
Name: "shares",
Usage: `cli tools to manage entries in the share manager.`,
Category: "maintenance",
Before: func(c *cli.Context) error {
// Parse base config
if err := parser.ParseConfig(cfg, true); err != nil {
return configlog.ReturnError(err)
}
// Parse sharing config
cfg.Sharing.Commons = cfg.Commons
return configlog.ReturnError(sharingparser.ParseConfig(cfg.Sharing))
},
Subcommands: []*cli.Command{
cleanupCmd(cfg),
},
}
}
func init() {
register.AddCommand(SharesCommand)
}
func cleanupCmd(cfg *config.Config) *cli.Command {
return &cli.Command{
Name: "cleanup",
Usage: `clean up stale entries in the share manager.`,
Flags: []cli.Flag{
&cli.StringFlag{
Name: "service-account-id",
Value: "",
Usage: "Name of the service account to use for the cleanup",
EnvVars: []string{"OCIS_SERVICE_ACCOUNT_ID"},
Required: true,
},
&cli.StringFlag{
Name: "service-account-secret",
Value: "",
Usage: "Secret for the service account",
EnvVars: []string{"OCIS_SERVICE_ACCOUNT_SECRET"},
Required: true,
},
},
Before: func(c *cli.Context) error {
// Parse base config
if err := parser.ParseConfig(cfg, true); err != nil {
return configlog.ReturnError(err)
}
// Parse sharing config
cfg.Sharing.Commons = cfg.Commons
return configlog.ReturnError(sharingparser.ParseConfig(cfg.Sharing))
},
Action: func(c *cli.Context) error {
return cleanup(c, cfg)
},
}
}
func cleanup(c *cli.Context, cfg *config.Config) error {
driver := cfg.Sharing.UserSharingDriver
// cleanup is only implemented for the jsoncs3 share manager
if driver != "jsoncs3" {
return configlog.ReturnError(errors.New("cleanup is only implemented for the jsoncs3 share manager"))
}
rcfg := revaShareConfig(cfg.Sharing)
f, ok := registry.NewFuncs[driver]
if !ok {
return configlog.ReturnError(errors.New("Unknown share manager type '" + driver + "'"))
}
mgr, err := f(rcfg[driver].(map[string]interface{}))
if err != nil {
return configlog.ReturnError(err)
}
// Initialize registry to make service lookup work
_ = mregistry.GetRegistry()
// get an authenticated context
gatewaySelector, err := pool.GatewaySelector(cfg.Sharing.Reva.Address)
if err != nil {
return configlog.ReturnError(err)
}
client, err := gatewaySelector.Next()
if err != nil {
return configlog.ReturnError(err)
}
serviceUserCtx, err := utils.GetServiceUserContext(c.String("service-account-id"), client, c.String("service-account-secret"))
if err != nil {
return configlog.ReturnError(err)
}
l := logger()
zerolog.SetGlobalLevel(zerolog.InfoLevel)
serviceUserCtx = l.WithContext(serviceUserCtx)
mgr.(*jsoncs3.Manager).CleanupStaleShares(serviceUserCtx)
return nil
}

View File

@@ -520,12 +520,7 @@ func (s *service) UpdateReceivedShare(ctx context.Context, req *collaboration.Up
isMountPointSet := slices.Contains(req.GetUpdateMask().GetPaths(), _fieldMaskPathMountPoint) && req.GetShare().GetMountPoint().GetPath() != ""
// we calculate a valid mountpoint only if the share should be accepted and the mount point is not set explicitly
if isStateTransitionShareAccepted && !isMountPointSet {
gatewayClient, err := s.gatewaySelector.Next()
if err != nil {
return nil, err
}
s, err := setReceivedShareMountPoint(ctx, gatewayClient, req)
s, err := s.setReceivedShareMountPoint(ctx, req)
switch {
case err != nil:
fallthrough
@@ -556,7 +551,11 @@ func (s *service) UpdateReceivedShare(ctx context.Context, req *collaboration.Up
}
}
func setReceivedShareMountPoint(ctx context.Context, gwc gateway.GatewayAPIClient, req *collaboration.UpdateReceivedShareRequest) (*rpc.Status, error) {
func (s *service) setReceivedShareMountPoint(ctx context.Context, req *collaboration.UpdateReceivedShareRequest) (*rpc.Status, error) {
gwc, err := s.gatewaySelector.Next()
if err != nil {
return nil, err
}
receivedShare, err := gwc.GetReceivedShare(ctx, &collaboration.GetReceivedShareRequest{
Ref: &collaboration.ShareReference{
Spec: &collaboration.ShareReference_Id{
@@ -575,6 +574,10 @@ func setReceivedShareMountPoint(ctx context.Context, gwc gateway.GatewayAPIClien
return status.NewOK(ctx), nil
}
gwc, err = s.gatewaySelector.Next()
if err != nil {
return nil, err
}
resourceStat, err := gwc.Stat(ctx, &provider.StatRequest{
Ref: &provider.Reference{
ResourceId: receivedShare.GetShare().GetShare().GetResourceId(),
@@ -592,11 +595,15 @@ func setReceivedShareMountPoint(ctx context.Context, gwc gateway.GatewayAPIClien
var userID *userpb.UserId
_ = utils.ReadJSONFromOpaque(req.Opaque, "userid", &userID)
receivedShares, err := s.sm.ListReceivedShares(ctx, []*collaboration.Filter{}, userID)
if err != nil {
return nil, err
}
// check if the requested mount point is available and if not, find a suitable one
availableMountpoint, _, err := GetMountpointAndUnmountedShares(ctx, gwc,
availableMountpoint, _, err := getMountpointAndUnmountedShares(ctx, receivedShares, s.gatewaySelector, nil,
resourceStat.GetInfo().GetId(),
resourceStat.GetInfo().GetName(),
userID,
)
if err != nil {
return status.NewInternal(ctx, err.Error()), nil
@@ -620,7 +627,6 @@ func GetMountpointAndUnmountedShares(ctx context.Context, gwc gateway.GatewayAPI
if userId != nil {
listReceivedSharesReq.Opaque = utils.AppendJSONToOpaque(nil, "userid", userId)
}
listReceivedSharesRes, err := gwc.ListReceivedShares(ctx, listReceivedSharesReq)
if err != nil {
return "", nil, errtypes.InternalError("grpc list received shares request failed")
@@ -630,17 +636,30 @@ func GetMountpointAndUnmountedShares(ctx context.Context, gwc gateway.GatewayAPI
return "", nil, err
}
return getMountpointAndUnmountedShares(ctx, listReceivedSharesRes.GetShares(), nil, gwc, id, name)
}
// GetMountpointAndUnmountedShares returns a new or existing mountpoint for the given info and produces a list of unmounted received shares for the same resource
func getMountpointAndUnmountedShares(ctx context.Context, receivedShares []*collaboration.ReceivedShare, gatewaySelector pool.Selectable[gateway.GatewayAPIClient], gwc gateway.GatewayAPIClient, id *provider.ResourceId, name string) (string, []*collaboration.ReceivedShare, error) {
unmountedShares := []*collaboration.ReceivedShare{}
base := filepath.Clean(name)
mount := base
existingMountpoint := ""
mountedShares := make([]string, 0, len(listReceivedSharesRes.GetShares()))
mountedShares := make([]string, 0, len(receivedShares))
var pathExists bool
var err error
for _, s := range listReceivedSharesRes.GetShares() {
for _, s := range receivedShares {
resourceIDEqual := utils.ResourceIDEqual(s.GetShare().GetResourceId(), id)
if resourceIDEqual && s.State == collaboration.ShareState_SHARE_STATE_ACCEPTED {
if gatewaySelector != nil {
gwc, err = gatewaySelector.Next()
if err != nil {
return "", nil, err
}
}
// a share to the resource already exists and is mounted, remembers the mount point
_, err := utils.GetResourceByID(ctx, s.GetShare().GetResourceId(), gwc)
if err == nil {
@@ -658,6 +677,12 @@ func GetMountpointAndUnmountedShares(ctx context.Context, gwc gateway.GatewayAPI
mountedShares = append(mountedShares, s.GetMountPoint().GetPath())
if s.GetMountPoint().GetPath() == mount {
// does the shared resource still exist?
if gatewaySelector != nil {
gwc, err = gatewaySelector.Next()
if err != nil {
return "", nil, err
}
}
_, err := utils.GetResourceByID(ctx, s.GetShare().GetResourceId(), gwc)
if err == nil {
pathExists = true

View File

@@ -133,12 +133,14 @@ func (s *svc) handleProppatch(ctx context.Context, w http.ResponseWriter, r *htt
rreq := &provider.UnsetArbitraryMetadataRequest{
Ref: ref,
ArbitraryMetadataKeys: []string{""},
LockId: requestLockToken(r),
}
sreq := &provider.SetArbitraryMetadataRequest{
Ref: ref,
ArbitraryMetadata: &provider.ArbitraryMetadata{
Metadata: map[string]string{},
},
LockId: requestLockToken(r),
}
acceptedProps := []xml.Name{}

View File

@@ -34,6 +34,7 @@ import (
"github.com/cs3org/reva/v2/pkg/errtypes"
"github.com/cs3org/reva/v2/pkg/events"
"github.com/cs3org/reva/v2/pkg/events/stream"
"github.com/cs3org/reva/v2/pkg/logger"
"github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool"
"github.com/cs3org/reva/v2/pkg/share"
"github.com/cs3org/reva/v2/pkg/share/manager/jsoncs3/providercache"
@@ -114,6 +115,12 @@ func init() {
registry.Register("jsoncs3", NewDefault)
}
var (
_registeredEvents = []events.Unmarshaller{
events.SpaceDeleted{},
}
)
type config struct {
GatewayAddr string `mapstructure:"gateway_addr"`
MaxConcurrency int `mapstructure:"max_concurrency"`
@@ -188,7 +195,8 @@ func NewDefault(m map[string]interface{}) (share.Manager, error) {
// New returns a new manager instance.
func New(s metadata.Storage, gatewaySelector pool.Selectable[gatewayv1beta1.GatewayAPIClient], ttlSeconds int, es events.Stream, maxconcurrency int) (*Manager, error) {
ttl := time.Duration(ttlSeconds) * time.Second
return &Manager{
m := &Manager{
Cache: providercache.New(s, ttl),
CreatedCache: sharecache.New(s, "users", "created.json", ttl),
UserReceivedStates: receivedsharecache.New(s, ttl),
@@ -197,7 +205,18 @@ func New(s metadata.Storage, gatewaySelector pool.Selectable[gatewayv1beta1.Gate
gatewaySelector: gatewaySelector,
eventStream: es,
MaxConcurrency: maxconcurrency,
}, nil
}
// listen for events
if m.eventStream != nil {
ch, err := events.Consume(m.eventStream, "jsoncs3sharemanager", _registeredEvents...)
if err != nil {
appctx.GetLogger(context.Background()).Error().Err(err).Msg("error consuming events")
}
go m.ProcessEvents(ch)
}
return m, nil
}
func (m *Manager) initialize(ctx context.Context) error {
@@ -248,6 +267,22 @@ func (m *Manager) initialize(ctx context.Context) error {
return nil
}
func (m *Manager) ProcessEvents(ch <-chan events.Event) {
log := logger.New()
for event := range ch {
ctx := context.Background()
if err := m.initialize(ctx); err != nil {
log.Error().Err(err).Msg("error initializing manager")
}
if ev, ok := event.Event.(events.SpaceDeleted); ok {
log.Debug().Msgf("space deleted event: %v", ev)
go func() { m.purgeSpace(ctx, ev.ID) }()
}
}
}
// Share creates a new share
func (m *Manager) Share(ctx context.Context, md *provider.ResourceInfo, g *collaboration.ShareGrant) (*collaboration.Share, error) {
ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "Share")
@@ -420,7 +455,7 @@ func (m *Manager) GetShare(ctx context.Context, ref *collaboration.ShareReferenc
return nil, err
}
if share.IsExpired(s) {
if err := m.removeShare(ctx, s); err != nil {
if err := m.removeShare(ctx, s, false); err != nil {
sublog.Error().Err(err).
Msg("failed to unshare expired share")
}
@@ -485,7 +520,7 @@ func (m *Manager) Unshare(ctx context.Context, ref *collaboration.ShareReference
return errtypes.NotFound(ref.String())
}
return m.removeShare(ctx, s)
return m.removeShare(ctx, s, false)
}
// UpdateShare updates the mode of the given share.
@@ -622,7 +657,7 @@ func (m *Manager) listSharesByIDs(ctx context.Context, user *userv1beta1.User, f
resourceID := s.GetResourceId()
sublog = sublog.With().Str("storageid", resourceID.GetStorageId()).Str("spaceid", resourceID.GetSpaceId()).Str("opaqueid", resourceID.GetOpaqueId()).Logger()
if share.IsExpired(s) {
if err := m.removeShare(ctx, s); err != nil {
if err := m.removeShare(ctx, s, false); err != nil {
sublog.Error().Err(err).
Msg("failed to unshare expired share")
}
@@ -740,7 +775,7 @@ func (m *Manager) listCreatedShares(ctx context.Context, user *userv1beta1.User,
continue
}
if share.IsExpired(s) {
if err := m.removeShare(ctx, s); err != nil {
if err := m.removeShare(ctx, s, false); err != nil {
sublog.Error().Err(err).
Msg("failed to unshare expired share")
}
@@ -901,12 +936,18 @@ func (m *Manager) ListReceivedShares(ctx context.Context, filters []*collaborati
}
for shareID, state := range w.rspace.States {
s, err := m.Cache.Get(ctx, storageID, spaceID, shareID, true)
if err != nil || s == nil {
if err != nil {
sublogr.Error().Err(err).Msg("could not retrieve share")
continue
}
if s == nil {
sublogr.Warn().Str("shareid", shareID).Msg("share not found. cleaning up")
_ = m.UserReceivedStates.Remove(ctx, user.Id.OpaqueId, w.ssid, shareID)
continue
}
sublogr = sublogr.With().Str("shareid", shareID).Logger()
if share.IsExpired(s) {
if err := m.removeShare(ctx, s); err != nil {
if err := m.removeShare(ctx, s, false); err != nil {
sublogr.Error().Err(err).
Msg("failed to unshare expired share")
}
@@ -1009,7 +1050,7 @@ func (m *Manager) getReceived(ctx context.Context, ref *collaboration.ShareRefer
return nil, errtypes.NotFound(ref.String())
}
if share.IsExpired(s) {
if err := m.removeShare(ctx, s); err != nil {
if err := m.removeShare(ctx, s, false); err != nil {
sublog.Error().Err(err).
Msg("failed to unshare expired share")
}
@@ -1136,24 +1177,107 @@ func (m *Manager) Load(ctx context.Context, shareChan <-chan *collaboration.Shar
return nil
}
func (m *Manager) removeShare(ctx context.Context, s *collaboration.Share) error {
func (m *Manager) purgeSpace(ctx context.Context, id *provider.StorageSpaceId) {
log := appctx.GetLogger(ctx)
storageID, spaceID := storagespace.SplitStorageID(id.OpaqueId)
shares, err := m.Cache.ListSpace(ctx, storageID, spaceID)
if err != nil {
log.Error().Err(err).Msg("error listing shares in space")
return
}
// iterate over all shares in the space and remove them
for _, share := range shares.Shares {
err := m.removeShare(ctx, share, true)
if err != nil {
log.Error().Err(err).Msg("error removing share")
}
}
// remove all shares in the space
err = m.Cache.PurgeSpace(ctx, storageID, spaceID)
if err != nil {
log.Error().Err(err).Msg("error purging space")
}
}
func (m *Manager) removeShare(ctx context.Context, s *collaboration.Share, skipSpaceCache bool) error {
ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "removeShare")
defer span.End()
eg, ctx := errgroup.WithContext(ctx)
eg.Go(func() error {
storageID, spaceID, _ := shareid.Decode(s.Id.OpaqueId)
err := m.Cache.Remove(ctx, storageID, spaceID, s.Id.OpaqueId)
if !skipSpaceCache {
eg.Go(func() error {
storageID, spaceID, _ := shareid.Decode(s.Id.OpaqueId)
err := m.Cache.Remove(ctx, storageID, spaceID, s.Id.OpaqueId)
return err
})
return err
})
}
eg.Go(func() error {
// remove from created cache
return m.CreatedCache.Remove(ctx, s.GetCreator().GetOpaqueId(), s.Id.OpaqueId)
})
// TODO remove from grantee cache
eg.Go(func() error {
// remove from user received states
if s.GetGrantee().Type == provider.GranteeType_GRANTEE_TYPE_USER {
return m.UserReceivedStates.Remove(ctx, s.GetGrantee().GetUserId().GetOpaqueId(), s.GetResourceId().GetStorageId()+shareid.IDDelimiter+s.GetResourceId().GetSpaceId(), s.Id.OpaqueId)
} else if s.GetGrantee().Type == provider.GranteeType_GRANTEE_TYPE_GROUP {
return m.GroupReceivedCache.Remove(ctx, s.GetGrantee().GetGroupId().GetOpaqueId(), s.Id.OpaqueId)
}
return nil
})
return eg.Wait()
}
func (m *Manager) CleanupStaleShares(ctx context.Context) {
log := appctx.GetLogger(ctx)
if err := m.initialize(ctx); err != nil {
return
}
// list all shares
providers, err := m.Cache.All(ctx)
if err != nil {
log.Error().Err(err).Msg("error listing all shares")
return
}
client, err := m.gatewaySelector.Next()
if err != nil {
log.Error().Err(err).Msg("could not get gateway client")
}
providers.Range(func(storage string, spaces *providercache.Spaces) bool {
log.Info().Str("storage", storage).Interface("spaceCount", spaces.Spaces.Count()).Msg("checking storage")
spaces.Spaces.Range(func(space string, shares *providercache.Shares) bool {
log.Info().Str("storage", storage).Str("space", space).Interface("shareCount", len(shares.Shares)).Msg("checking space")
for _, s := range shares.Shares {
req := &provider.StatRequest{
Ref: &provider.Reference{ResourceId: s.ResourceId, Path: "."},
}
res, err := client.Stat(ctx, req)
if err != nil {
log.Error().Err(err).Str("storage", storage).Str("space", space).Msg("could not stat shared resource")
}
if res.Status.Code == rpcv1beta1.Code_CODE_NOT_FOUND {
log.Info().Str("storage", storage).Str("space", space).Msg("shared resource does not exist anymore. cleaning up shares")
if err := m.removeShare(ctx, s, false); err != nil {
log.Error().Err(err).Str("storage", storage).Str("space", space).Msg("could not remove share")
}
}
}
return true
})
return true
})
}

View File

@@ -25,6 +25,7 @@ import (
"os"
"path"
"path/filepath"
"strings"
"sync"
"time"
@@ -319,6 +320,36 @@ func (c *Cache) Get(ctx context.Context, storageID, spaceID, shareID string, ski
return space.Shares[shareID], nil
}
// All returns all entries in the storage
func (c *Cache) All(ctx context.Context) (*mtimesyncedcache.Map[string, *Spaces], error) {
ctx, span := tracer.Start(ctx, "All")
defer span.End()
providers, err := c.storage.ListDir(ctx, "/storages")
if err != nil {
return nil, err
}
for _, provider := range providers {
storageID := provider.Name
spaces, err := c.storage.ListDir(ctx, path.Join("/storages", storageID))
if err != nil {
return nil, err
}
for _, space := range spaces {
spaceID := strings.TrimSuffix(space.Name, ".json")
unlock := c.LockSpace(spaceID)
span.AddEvent("got lock for space " + spaceID)
if err := c.syncWithLock(ctx, storageID, spaceID); err != nil {
return nil, err
}
unlock()
}
}
return &c.Providers, nil
}
// ListSpace returns the list of shares in a given space
func (c *Cache) ListSpace(ctx context.Context, storageID, spaceID string) (*Shares, error) {
ctx, span := tracer.Start(ctx, "ListSpace")
@@ -418,6 +449,35 @@ func (c *Cache) Persist(ctx context.Context, storageID, spaceID string) error {
return nil
}
// PurgeSpace removes a space from the cache
func (c *Cache) PurgeSpace(ctx context.Context, storageID, spaceID string) error {
ctx, span := tracer.Start(ctx, "PurgeSpace")
defer span.End()
unlock := c.LockSpace(spaceID)
defer unlock()
span.AddEvent("got lock")
if !c.isSpaceCached(storageID, spaceID) {
err := c.syncWithLock(ctx, storageID, spaceID)
if err != nil {
return err
}
}
spaces, ok := c.Providers.Load(storageID)
if !ok {
return nil
}
newShares := &Shares{}
if space, ok := spaces.Spaces.Load(spaceID); ok {
newShares.Etag = space.Etag // keep the etag to allow overwriting the state on the server
}
spaces.Spaces.Store(spaceID, newShares)
return c.Persist(ctx, storageID, spaceID)
}
func (c *Cache) syncWithLock(ctx context.Context, storageID, spaceID string) error {
ctx, span := tracer.Start(ctx, "syncWithLock")
defer span.End()

View File

@@ -185,6 +185,74 @@ func (c *Cache) Get(ctx context.Context, userID, spaceID, shareID string) (*Stat
return rss.Spaces[spaceID].States[shareID], nil
}
// Remove removes an entry from the cache
func (c *Cache) Remove(ctx context.Context, userID, spaceID, shareID string) error {
ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "Grab lock")
unlock := c.lockUser(userID)
span.End()
span.SetAttributes(attribute.String("cs3.userid", userID))
defer unlock()
ctx, span = appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "Add")
defer span.End()
span.SetAttributes(attribute.String("cs3.userid", userID), attribute.String("cs3.spaceid", spaceID))
persistFunc := func() error {
c.initializeIfNeeded(userID, spaceID)
rss, _ := c.ReceivedSpaces.Load(userID)
receivedSpace := rss.Spaces[spaceID]
if receivedSpace.States == nil {
receivedSpace.States = map[string]*State{}
}
delete(receivedSpace.States, shareID)
if len(receivedSpace.States) == 0 {
delete(rss.Spaces, spaceID)
}
return c.persist(ctx, userID)
}
log := appctx.GetLogger(ctx).With().
Str("hostname", os.Getenv("HOSTNAME")).
Str("userID", userID).
Str("spaceID", spaceID).Logger()
var err error
for retries := 100; retries > 0; retries-- {
err = persistFunc()
switch err.(type) {
case nil:
span.SetStatus(codes.Ok, "")
return nil
case errtypes.Aborted:
log.Debug().Msg("aborted when persisting added received share: etag changed. retrying...")
// this is the expected status code from the server when the if-match etag check fails
// continue with sync below
case errtypes.PreconditionFailed:
log.Debug().Msg("precondition failed when persisting added received share: etag changed. retrying...")
// actually, this is the wrong status code and we treat it like errtypes.Aborted because of inconsistencies on the server side
// continue with sync below
case errtypes.AlreadyExists:
log.Debug().Msg("already exists when persisting added received share. retrying...")
// CS3 uses an already exists error instead of precondition failed when using an If-None-Match=* header / IfExists flag in the InitiateFileUpload call.
// Thas happens when the cache thinks there is no file.
// continue with sync below
default:
span.SetStatus(codes.Error, fmt.Sprintf("persisting added received share failed. giving up: %s", err.Error()))
log.Error().Err(err).Msg("persisting added received share failed")
return err
}
if err := c.syncWithLock(ctx, userID); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
log.Error().Err(err).Msg("persisting added received share failed. giving up.")
return err
}
}
return err
}
// List returns a list of received shares for a given user
// The return list is guaranteed to be thread-safe
func (c *Cache) List(ctx context.Context, userID string) (map[string]*Space, error) {

View File

@@ -214,6 +214,11 @@ func (c *Cache) Remove(ctx context.Context, userid, shareID string) error {
log.Debug().Msg("precondition failed when persisting removed share: etag changed. retrying...")
// actually, this is the wrong status code and we treat it like errtypes.Aborted because of inconsistencies on the server side
// continue with sync below
case errtypes.AlreadyExists:
log.Debug().Msg("file already existed when persisting removed share. retrying...")
// CS3 uses an already exists error instead of precondition failed when using an If-None-Match=* header / IfExists flag in the InitiateFileUpload call.
// Thas happens when the cache thinks there is no file.
// continue with sync below
default:
span.SetStatus(codes.Error, fmt.Sprintf("persisting removed share failed. giving up: %s", err.Error()))
log.Error().Err(err).Msg("persisting removed share failed")

View File

@@ -36,7 +36,7 @@ func init() {
// New returns an implementation to of the storage.FS interface that talk to
// a local filesystem.
func New(m map[string]interface{}, stream events.Stream, _ *zerolog.Logger) (storage.FS, error) {
func New(m map[string]interface{}, stream events.Stream, log *zerolog.Logger) (storage.FS, error) {
o, err := options.New(m)
if err != nil {
return nil, err
@@ -47,5 +47,5 @@ func New(m map[string]interface{}, stream events.Stream, _ *zerolog.Logger) (sto
return nil, err
}
return decomposedfs.NewDefault(m, bs, stream)
return decomposedfs.NewDefault(m, bs, stream, log)
}

View File

@@ -134,7 +134,7 @@ func New(m map[string]interface{}, stream events.Stream, log *zerolog.Logger) (s
Trashbin: trashbin,
}
dfs, err := decomposedfs.New(&o.Options, aspects)
dfs, err := decomposedfs.New(&o.Options, aspects, log)
if err != nil {
return nil, err
}

View File

@@ -704,6 +704,8 @@ func (t *Tree) ResolveSpaceIDIndexEntry(spaceid, entry string) (string, string,
// InitNewNode initializes a new node
func (t *Tree) InitNewNode(ctx context.Context, n *node.Node, fsize uint64) (metadata.UnlockFunc, error) {
_, span := tracer.Start(ctx, "InitNewNode")
defer span.End()
// create folder structure (if needed)
if err := os.MkdirAll(filepath.Dir(n.InternalPath()), 0700); err != nil {
return nil, err

View File

@@ -35,7 +35,7 @@ func init() {
// New returns an implementation to of the storage.FS interface that talk to
// a local filesystem.
func New(m map[string]interface{}, stream events.Stream, _ *zerolog.Logger) (storage.FS, error) {
func New(m map[string]interface{}, stream events.Stream, log *zerolog.Logger) (storage.FS, error) {
o, err := parseConfig(m)
if err != nil {
return nil, err
@@ -59,5 +59,5 @@ func New(m map[string]interface{}, stream events.Stream, _ *zerolog.Logger) (sto
return nil, err
}
return decomposedfs.NewDefault(m, bs, stream)
return decomposedfs.NewDefault(m, bs, stream, log)
}

View File

@@ -35,6 +35,7 @@ import (
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"github.com/jellydator/ttlcache/v2"
"github.com/pkg/errors"
"github.com/rs/zerolog"
tusd "github.com/tus/tusd/v2/pkg/handler"
microstore "go-micro.dev/v4/store"
"go.opentelemetry.io/otel"
@@ -125,10 +126,12 @@ type Decomposedfs struct {
userSpaceIndex *spaceidindex.Index
groupSpaceIndex *spaceidindex.Index
spaceTypeIndex *spaceidindex.Index
log *zerolog.Logger
}
// NewDefault returns an instance with default components
func NewDefault(m map[string]interface{}, bs tree.Blobstore, es events.Stream) (storage.FS, error) {
func NewDefault(m map[string]interface{}, bs tree.Blobstore, es events.Stream, log *zerolog.Logger) (storage.FS, error) {
o, err := options.New(m)
if err != nil {
return nil, err
@@ -169,14 +172,12 @@ func NewDefault(m map[string]interface{}, bs tree.Blobstore, es events.Stream) (
Trashbin: &DecomposedfsTrashbin{},
}
return New(o, aspects)
return New(o, aspects, log)
}
// New returns an implementation of the storage.FS interface that talks to
// a local filesystem.
func New(o *options.Options, aspects aspects.Aspects) (storage.FS, error) {
log := logger.New()
func New(o *options.Options, aspects aspects.Aspects, log *zerolog.Logger) (storage.FS, error) {
err := aspects.Tree.Setup()
if err != nil {
log.Error().Err(err).Msg("could not setup tree")
@@ -235,6 +236,7 @@ func New(o *options.Options, aspects aspects.Aspects) (storage.FS, error) {
userSpaceIndex: userSpaceIndex,
groupSpaceIndex: groupSpaceIndex,
spaceTypeIndex: spaceTypeIndex,
log: log,
}
fs.sessionStore = upload.NewSessionStore(fs, aspects, o.Root, o.AsyncFileUploads, o.Tokens)
if err = fs.trashbin.Setup(fs); err != nil {
@@ -311,7 +313,7 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) {
keepUpload = true
metrics.UploadSessionsAborted.Inc()
case events.PPOutcomeContinue:
if err := session.Finalize(); err != nil {
if err := session.Finalize(ctx); err != nil {
sublog.Error().Err(err).Msg("could not finalize upload")
failed = true
revertNodeMetadata = false

View File

@@ -38,6 +38,8 @@ import (
// DenyGrant denies access to a resource.
func (fs *Decomposedfs) DenyGrant(ctx context.Context, ref *provider.Reference, grantee *provider.Grantee) error {
_, span := tracer.Start(ctx, "DenyGrant")
defer span.End()
log := appctx.GetLogger(ctx)
log.Debug().Interface("ref", ref).Interface("grantee", grantee).Msg("DenyGrant()")
@@ -74,6 +76,8 @@ func (fs *Decomposedfs) DenyGrant(ctx context.Context, ref *provider.Reference,
// AddGrant adds a grant to a resource
func (fs *Decomposedfs) AddGrant(ctx context.Context, ref *provider.Reference, g *provider.Grant) (err error) {
_, span := tracer.Start(ctx, "AddGrant")
defer span.End()
log := appctx.GetLogger(ctx)
log.Debug().Interface("ref", ref).Interface("grant", g).Msg("AddGrant()")
grantNode, unlockFunc, grant, err := fs.loadGrant(ctx, ref, g)
@@ -119,6 +123,8 @@ func (fs *Decomposedfs) AddGrant(ctx context.Context, ref *provider.Reference, g
// ListGrants lists the grants on the specified resource
func (fs *Decomposedfs) ListGrants(ctx context.Context, ref *provider.Reference) (grants []*provider.Grant, err error) {
_, span := tracer.Start(ctx, "ListGrants")
defer span.End()
var grantNode *node.Node
if grantNode, err = fs.lu.NodeFromResource(ctx, ref); err != nil {
return
@@ -174,6 +180,8 @@ func (fs *Decomposedfs) ListGrants(ctx context.Context, ref *provider.Reference)
// RemoveGrant removes a grant from resource
func (fs *Decomposedfs) RemoveGrant(ctx context.Context, ref *provider.Reference, g *provider.Grant) (err error) {
_, span := tracer.Start(ctx, "RemoveGrant")
defer span.End()
grantNode, unlockFunc, grant, err := fs.loadGrant(ctx, ref, g)
if err != nil {
return err
@@ -235,6 +243,8 @@ func isShareGrant(ctx context.Context) bool {
// UpdateGrant updates a grant on a resource
// TODO remove AddGrant or UpdateGrant grant from CS3 api, redundant? tracked in https://github.com/cs3org/cs3apis/issues/92
func (fs *Decomposedfs) UpdateGrant(ctx context.Context, ref *provider.Reference, g *provider.Grant) error {
_, span := tracer.Start(ctx, "UpdateGrant")
defer span.End()
log := appctx.GetLogger(ctx)
log.Debug().Interface("ref", ref).Interface("grant", g).Msg("UpdateGrant()")
@@ -272,6 +282,8 @@ func (fs *Decomposedfs) UpdateGrant(ctx context.Context, ref *provider.Reference
// checks if the given grant exists and returns it. Nil grant means it doesn't exist
func (fs *Decomposedfs) loadGrant(ctx context.Context, ref *provider.Reference, g *provider.Grant) (*node.Node, metadata.UnlockFunc, *provider.Grant, error) {
_, span := tracer.Start(ctx, "loadGrant")
defer span.End()
n, err := fs.lu.NodeFromResource(ctx, ref)
if err != nil {
return nil, nil, nil, err
@@ -308,6 +320,8 @@ func (fs *Decomposedfs) loadGrant(ctx context.Context, ref *provider.Reference,
}
func (fs *Decomposedfs) storeGrant(ctx context.Context, n *node.Node, g *provider.Grant) error {
_, span := tracer.Start(ctx, "storeGrant")
defer span.End()
// if is a grant to a space root, the receiver needs the space type to update the indexes
spaceType, ok := storageprovider.SpaceTypeFromContext(ctx)
if !ok {

View File

@@ -38,6 +38,8 @@ import (
// SetArbitraryMetadata sets the metadata on a resource
func (fs *Decomposedfs) SetArbitraryMetadata(ctx context.Context, ref *provider.Reference, md *provider.ArbitraryMetadata) (err error) {
_, span := tracer.Start(ctx, "SetArbitraryMetadata")
defer span.End()
n, err := fs.lu.NodeFromResource(ctx, ref)
if err != nil {
return errors.Wrap(err, "Decomposedfs: error resolving ref")
@@ -131,6 +133,8 @@ func (fs *Decomposedfs) SetArbitraryMetadata(ctx context.Context, ref *provider.
// UnsetArbitraryMetadata unsets the metadata on the given resource
func (fs *Decomposedfs) UnsetArbitraryMetadata(ctx context.Context, ref *provider.Reference, keys []string) (err error) {
_, span := tracer.Start(ctx, "UnsetArbitraryMetadata")
defer span.End()
n, err := fs.lu.NodeFromResource(ctx, ref)
if err != nil {
return errors.Wrap(err, "Decomposedfs: error resolving ref")

View File

@@ -34,3 +34,12 @@ func (m *Map[K, V]) Range(f func(key K, value V) bool) {
}
func (m *Map[K, V]) Store(key K, value V) { m.m.Store(key, value) }
func (m *Map[K, V]) Count() int {
l := 0
m.Range(func(_ K, _ V) bool {
l++
return true
})
return l
}

View File

@@ -38,6 +38,8 @@ import (
// SetLock sets a lock on the node
func (n *Node) SetLock(ctx context.Context, lock *provider.Lock) error {
ctx, span := tracer.Start(ctx, "SetLock")
defer span.End()
lockFilePath := n.LockFilePath()
// ensure parent path exists
@@ -89,22 +91,31 @@ func (n *Node) SetLock(ctx context.Context, lock *provider.Lock) error {
// ReadLock reads the lock id for a node
func (n Node) ReadLock(ctx context.Context, skipFileLock bool) (*provider.Lock, error) {
ctx, span := tracer.Start(ctx, "ReadLock")
defer span.End()
// ensure parent path exists
if err := os.MkdirAll(filepath.Dir(n.InternalPath()), 0700); err != nil {
_, subspan := tracer.Start(ctx, "os.MkdirAll")
err := os.MkdirAll(filepath.Dir(n.InternalPath()), 0700)
subspan.End()
if err != nil {
return nil, errors.Wrap(err, "Decomposedfs: error creating parent folder for lock")
}
// the caller of ReadLock already may hold a file lock
if !skipFileLock {
_, subspan := tracer.Start(ctx, "filelocks.AcquireReadLock")
fileLock, err := filelocks.AcquireReadLock(n.InternalPath())
subspan.End()
if err != nil {
return nil, err
}
defer func() {
_, subspan := tracer.Start(ctx, "filelocks.ReleaseLock")
rerr := filelocks.ReleaseLock(fileLock)
subspan.End()
// if err is non nil we do not overwrite that
if err == nil {
@@ -113,7 +124,10 @@ func (n Node) ReadLock(ctx context.Context, skipFileLock bool) (*provider.Lock,
}()
}
_, subspan = tracer.Start(ctx, "os.Open")
f, err := os.Open(n.LockFilePath())
subspan.End()
if err != nil {
if errors.Is(err, fs.ErrNotExist) {
return nil, errtypes.NotFound("no lock found")
@@ -130,7 +144,11 @@ func (n Node) ReadLock(ctx context.Context, skipFileLock bool) (*provider.Lock,
// lock already expired
if lock.Expiration != nil && time.Now().After(time.Unix(int64(lock.Expiration.Seconds), int64(lock.Expiration.Nanos))) {
if err = os.Remove(f.Name()); err != nil {
_, subspan = tracer.Start(ctx, "os.Remove")
err = os.Remove(f.Name())
subspan.End()
if err != nil {
return nil, errors.Wrap(err, "Decomposedfs: could not remove expired lock file")
}
// we successfully deleted the expired lock
@@ -142,6 +160,8 @@ func (n Node) ReadLock(ctx context.Context, skipFileLock bool) (*provider.Lock,
// RefreshLock refreshes the node's lock
func (n *Node) RefreshLock(ctx context.Context, lock *provider.Lock, existingLockID string) error {
ctx, span := tracer.Start(ctx, "RefreshLock")
defer span.End()
// ensure parent path exists
if err := os.MkdirAll(filepath.Dir(n.InternalPath()), 0700); err != nil {
@@ -204,6 +224,8 @@ func (n *Node) RefreshLock(ctx context.Context, lock *provider.Lock, existingLoc
// Unlock unlocks the node
func (n *Node) Unlock(ctx context.Context, lock *provider.Lock) error {
ctx, span := tracer.Start(ctx, "Unlock")
defer span.End()
// ensure parent path exists
if err := os.MkdirAll(filepath.Dir(n.InternalPath()), 0700); err != nil {

View File

@@ -218,6 +218,8 @@ func (n *Node) MarshalJSON() ([]byte, error) {
// Type returns the node's resource type
func (n *Node) Type(ctx context.Context) provider.ResourceType {
_, span := tracer.Start(ctx, "Type")
defer span.End()
if n.nodeType != nil {
return *n.nodeType
}
@@ -446,6 +448,8 @@ func (n *Node) Child(ctx context.Context, name string) (*Node, error) {
// ParentWithReader returns the parent node
func (n *Node) ParentWithReader(ctx context.Context, r io.Reader) (*Node, error) {
_, span := tracer.Start(ctx, "ParentWithReader")
defer span.End()
if n.ParentID == "" {
return nil, fmt.Errorf("decomposedfs: root has no parent")
}
@@ -1261,8 +1265,7 @@ func (n *Node) ProcessingID(ctx context.Context) (string, error) {
// IsSpaceRoot checks if the node is a space root
func (n *Node) IsSpaceRoot(ctx context.Context) bool {
_, err := n.Xattr(ctx, prefixes.SpaceNameAttr)
return err == nil
return n.ID == n.SpaceID
}
// SetScanData sets the virus scan info to the node

View File

@@ -68,6 +68,8 @@ func (md Attributes) Time(key string) (time.Time, error) {
// SetXattrs sets multiple extended attributes on the write-through cache/node
func (n *Node) SetXattrsWithContext(ctx context.Context, attribs map[string][]byte, acquireLock bool) (err error) {
_, span := tracer.Start(ctx, "SetXattrsWithContext")
defer span.End()
if n.xattrsCache != nil {
for k, v := range attribs {
n.xattrsCache[k] = v

View File

@@ -60,6 +60,8 @@ func (p Permissions) AssemblePermissions(ctx context.Context, n *node.Node) (*pr
// AssembleTrashPermissions is used to assemble file permissions
func (p Permissions) AssembleTrashPermissions(ctx context.Context, n *node.Node) (*provider.ResourcePermissions, error) {
_, span := tracer.Start(ctx, "AssembleTrashPermissions")
defer span.End()
return p.item.AssembleTrashPermissions(ctx, n)
}

View File

@@ -64,7 +64,8 @@ func (tb *DecomposedfsTrashbin) Setup(fs storage.FS) error {
// ListRecycle returns the list of available recycle items
// ref -> the space (= resourceid), key -> deleted node id, relativePath = relative to key
func (tb *DecomposedfsTrashbin) ListRecycle(ctx context.Context, ref *provider.Reference, key, relativePath string) ([]*provider.RecycleItem, error) {
_, span := tracer.Start(ctx, "ListRecycle")
defer span.End()
if ref == nil || ref.ResourceId == nil || ref.ResourceId.OpaqueId == "" {
return nil, errtypes.BadRequest("spaceid required")
}
@@ -346,6 +347,8 @@ func (tb *DecomposedfsTrashbin) listTrashRoot(ctx context.Context, spaceID strin
// RestoreRecycleItem restores the specified item
func (tb *DecomposedfsTrashbin) RestoreRecycleItem(ctx context.Context, ref *provider.Reference, key, relativePath string, restoreRef *provider.Reference) error {
_, span := tracer.Start(ctx, "RestoreRecycleItem")
defer span.End()
if ref == nil {
return errtypes.BadRequest("missing reference, needs a space id")
}
@@ -399,6 +402,8 @@ func (tb *DecomposedfsTrashbin) RestoreRecycleItem(ctx context.Context, ref *pro
// PurgeRecycleItem purges the specified item, all its children and all their revisions
func (tb *DecomposedfsTrashbin) PurgeRecycleItem(ctx context.Context, ref *provider.Reference, key, relativePath string) error {
_, span := tracer.Start(ctx, "PurgeRecycleItem")
defer span.End()
if ref == nil {
return errtypes.BadRequest("missing reference, needs a space id")
}
@@ -429,6 +434,8 @@ func (tb *DecomposedfsTrashbin) PurgeRecycleItem(ctx context.Context, ref *provi
// EmptyRecycle empties the trash
func (tb *DecomposedfsTrashbin) EmptyRecycle(ctx context.Context, ref *provider.Reference) error {
_, span := tracer.Start(ctx, "EmptyRecycle")
defer span.End()
if ref == nil || ref.ResourceId == nil || ref.ResourceId.OpaqueId == "" {
return errtypes.BadRequest("spaceid must be set")
}

View File

@@ -48,6 +48,8 @@ import (
// ListRevisions lists the revisions of the given resource
func (fs *Decomposedfs) ListRevisions(ctx context.Context, ref *provider.Reference) (revisions []*provider.FileVersion, err error) {
_, span := tracer.Start(ctx, "ListRevisions")
defer span.End()
var n *node.Node
if n, err = fs.lu.NodeFromResource(ctx, ref); err != nil {
return
@@ -115,6 +117,8 @@ func (fs *Decomposedfs) ListRevisions(ctx context.Context, ref *provider.Referen
// DownloadRevision returns a reader for the specified revision
// FIXME the CS3 api should explicitly allow initiating revision and trash download, a related issue is https://github.com/cs3org/reva/issues/1813
func (fs *Decomposedfs) DownloadRevision(ctx context.Context, ref *provider.Reference, revisionKey string, openReaderFunc func(md *provider.ResourceInfo) bool) (*provider.ResourceInfo, io.ReadCloser, error) {
_, span := tracer.Start(ctx, "DownloadRevision")
defer span.End()
log := appctx.GetLogger(ctx)
// verify revision key format
@@ -186,6 +190,8 @@ func (fs *Decomposedfs) DownloadRevision(ctx context.Context, ref *provider.Refe
// RestoreRevision restores the specified revision of the resource
func (fs *Decomposedfs) RestoreRevision(ctx context.Context, ref *provider.Reference, revisionKey string) (returnErr error) {
_, span := tracer.Start(ctx, "RestoreRevision")
defer span.End()
log := appctx.GetLogger(ctx)
// verify revision key format
@@ -330,6 +336,8 @@ func (fs *Decomposedfs) RestoreRevision(ctx context.Context, ref *provider.Refer
// DeleteRevision deletes the specified revision of the resource
func (fs *Decomposedfs) DeleteRevision(ctx context.Context, ref *provider.Reference, revisionKey string) error {
_, span := tracer.Start(ctx, "DeleteRevision")
defer span.End()
n, err := fs.getRevisionNode(ctx, ref, revisionKey, func(rp *provider.ResourcePermissions) bool {
return rp.RestoreFileVersion
})
@@ -345,6 +353,8 @@ func (fs *Decomposedfs) DeleteRevision(ctx context.Context, ref *provider.Refere
}
func (fs *Decomposedfs) getRevisionNode(ctx context.Context, ref *provider.Reference, revisionKey string, hasPermission func(*provider.ResourcePermissions) bool) (*node.Node, error) {
_, span := tracer.Start(ctx, "getRevisionNode")
defer span.End()
log := appctx.GetLogger(ctx)
// verify revision key format

View File

@@ -183,7 +183,7 @@ func (p AsyncPropagator) queuePropagation(ctx context.Context, spaceID, nodeID s
ready = true
break
}
log.Error().Err(err).Msg("failed to write Change to disk (retrying)")
log.Debug().Err(err).Msg("failed to write Change to disk (retrying)")
err = os.Mkdir(filepath.Dir(changePath), 0700)
triggerPropagation = err == nil || os.IsExist(err) // only the first goroutine, which succeeds to create the directory, is supposed to actually trigger the propagation
}
@@ -386,7 +386,7 @@ func (p AsyncPropagator) propagate(ctx context.Context, spaceID, nodeID string,
// a negative new treesize. Something must have gone wrong with the accounting.
// Reset the current treesize to 0.
log.Error().Uint64("treeSize", treeSize).Int64("sizeDiff", pc.SizeDiff).
Msg("Error when updating treesize of node. Updated treesize < 0. Reestting to 0")
Msg("Error when updating treesize of node. Updated treesize < 0. Resetting to 0")
newSize = 0
default:
newSize = treeSize - uint64(-pc.SizeDiff)
@@ -414,7 +414,7 @@ func (p AsyncPropagator) propagate(ctx context.Context, spaceID, nodeID string,
log.Info().Msg("Propagation done. cleaning up")
cleanup()
if !n.IsSpaceRoot(ctx) { // This does not seem robust as it checks the space name property
if !n.IsSpaceRoot(ctx) {
p.queuePropagation(ctx, n.SpaceID, n.ParentID, pc, log)
}

View File

@@ -106,6 +106,8 @@ func (t *Tree) Setup() error {
// GetMD returns the metadata of a node in the tree
func (t *Tree) GetMD(ctx context.Context, n *node.Node) (os.FileInfo, error) {
_, span := tracer.Start(ctx, "GetMD")
defer span.End()
md, err := os.Stat(n.InternalPath())
if err != nil {
if errors.Is(err, fs.ErrNotExist) {
@@ -119,6 +121,8 @@ func (t *Tree) GetMD(ctx context.Context, n *node.Node) (os.FileInfo, error) {
// TouchFile creates a new empty file
func (t *Tree) TouchFile(ctx context.Context, n *node.Node, markprocessing bool, mtime string) error {
_, span := tracer.Start(ctx, "TouchFile")
defer span.End()
if n.Exists {
if markprocessing {
return n.SetXattr(ctx, prefixes.StatusPrefix, []byte(node.ProcessingStatus))
@@ -223,6 +227,8 @@ func (t *Tree) CreateDir(ctx context.Context, n *node.Node) (err error) {
// Move replaces the target with the source
func (t *Tree) Move(ctx context.Context, oldNode *node.Node, newNode *node.Node) (err error) {
_, span := tracer.Start(ctx, "Move")
defer span.End()
if oldNode.SpaceID != newNode.SpaceID {
// WebDAV RFC https://www.rfc-editor.org/rfc/rfc4918#section-9.9.4 says to use
// > 502 (Bad Gateway) - This may occur when the destination is on another
@@ -432,6 +438,8 @@ func (t *Tree) ListFolder(ctx context.Context, n *node.Node) ([]*node.Node, erro
// Delete deletes a node in the tree by moving it to the trash
func (t *Tree) Delete(ctx context.Context, n *node.Node) (err error) {
_, span := tracer.Start(ctx, "Delete")
defer span.End()
path := filepath.Join(n.ParentPath(), n.Name)
// remove entry from cache immediately to avoid inconsistencies
defer func() { _ = t.idCache.Delete(path) }()
@@ -524,6 +532,8 @@ func (t *Tree) Delete(ctx context.Context, n *node.Node) (err error) {
// RestoreRecycleItemFunc returns a node and a function to restore it from the trash.
func (t *Tree) RestoreRecycleItemFunc(ctx context.Context, spaceid, key, trashPath string, targetNode *node.Node) (*node.Node, *node.Node, func() error, error) {
_, span := tracer.Start(ctx, "RestoreRecycleItemFunc")
defer span.End()
logger := appctx.GetLogger(ctx)
recycleNode, trashItem, deletedNodePath, origin, err := t.readRecycleItem(ctx, spaceid, key, trashPath)
@@ -623,6 +633,8 @@ func (t *Tree) RestoreRecycleItemFunc(ctx context.Context, spaceid, key, trashPa
// PurgeRecycleItemFunc returns a node and a function to purge it from the trash
func (t *Tree) PurgeRecycleItemFunc(ctx context.Context, spaceid, key string, path string) (*node.Node, func() error, error) {
_, span := tracer.Start(ctx, "PurgeRecycleItemFunc")
defer span.End()
logger := appctx.GetLogger(ctx)
rn, trashItem, deletedNodePath, _, err := t.readRecycleItem(ctx, spaceid, key, path)
@@ -664,25 +676,38 @@ func (t *Tree) PurgeRecycleItemFunc(ctx context.Context, spaceid, key string, pa
// InitNewNode initializes a new node
func (t *Tree) InitNewNode(ctx context.Context, n *node.Node, fsize uint64) (metadata.UnlockFunc, error) {
_, span := tracer.Start(ctx, "InitNewNode")
defer span.End()
// create folder structure (if needed)
if err := os.MkdirAll(filepath.Dir(n.InternalPath()), 0700); err != nil {
_, subspan := tracer.Start(ctx, "os.MkdirAll")
err := os.MkdirAll(filepath.Dir(n.InternalPath()), 0700)
subspan.End()
if err != nil {
return nil, err
}
// create and write lock new node metadata
_, subspan = tracer.Start(ctx, "metadata.Lock")
unlock, err := t.lookup.MetadataBackend().Lock(n.InternalPath())
subspan.End()
if err != nil {
return nil, err
}
// we also need to touch the actual node file here it stores the mtime of the resource
_, subspan = tracer.Start(ctx, "os.OpenFile")
h, err := os.OpenFile(n.InternalPath(), os.O_CREATE|os.O_EXCL, 0600)
subspan.End()
if err != nil {
return unlock, err
}
h.Close()
if _, err := node.CheckQuota(ctx, n.SpaceRoot, false, 0, fsize); err != nil {
_, subspan = tracer.Start(ctx, "node.CheckQuota")
_, err = node.CheckQuota(ctx, n.SpaceRoot, false, 0, fsize)
subspan.End()
if err != nil {
return unlock, err
}
@@ -692,7 +717,10 @@ func (t *Tree) InitNewNode(ctx context.Context, n *node.Node, fsize uint64) (met
log := appctx.GetLogger(ctx).With().Str("childNameLink", childNameLink).Str("relativeNodePath", relativeNodePath).Logger()
log.Info().Msg("initNewNode: creating symlink")
if err = os.Symlink(relativeNodePath, childNameLink); err != nil {
_, subspan = tracer.Start(ctx, "os.Symlink")
err = os.Symlink(relativeNodePath, childNameLink)
subspan.End()
if err != nil {
log.Info().Err(err).Msg("initNewNode: symlink failed")
if errors.Is(err, fs.ErrExist) {
log.Info().Err(err).Msg("initNewNode: symlink already exists")
@@ -854,6 +882,8 @@ var nodeIDRegep = regexp.MustCompile(`.*/nodes/([^.]*).*`)
// TODO refactor the returned params into Node properties? would make all the path transformations go away...
func (t *Tree) readRecycleItem(ctx context.Context, spaceID, key, path string) (recycleNode *node.Node, trashItem string, deletedNodePath string, origin string, err error) {
_, span := tracer.Start(ctx, "readRecycleItem")
defer span.End()
logger := appctx.GetLogger(ctx)
if key == "" {

View File

@@ -47,6 +47,8 @@ import (
// TODO Upload (and InitiateUpload) needs a way to receive the expected checksum.
// Maybe in metadata as 'checksum' => 'sha1 aeosvp45w5xaeoe' = lowercase, space separated?
func (fs *Decomposedfs) Upload(ctx context.Context, req storage.UploadRequest, uff storage.UploadFinishedFunc) (*provider.ResourceInfo, error) {
_, span := tracer.Start(ctx, "Upload")
defer span.End()
up, err := fs.GetUpload(ctx, req.Ref.GetPath())
if err != nil {
return &provider.ResourceInfo{}, errors.Wrap(err, "Decomposedfs: error retrieving upload")
@@ -130,6 +132,8 @@ func (fs *Decomposedfs) Upload(ctx context.Context, req storage.UploadRequest, u
// TODO read optional content for small files in this request
// TODO InitiateUpload (and Upload) needs a way to receive the expected checksum. Maybe in metadata as 'checksum' => 'sha1 aeosvp45w5xaeoe' = lowercase, space separated?
func (fs *Decomposedfs) InitiateUpload(ctx context.Context, ref *provider.Reference, uploadLength int64, metadata map[string]string) (map[string]string, error) {
_, span := tracer.Start(ctx, "InitiateUpload")
defer span.End()
log := appctx.GetLogger(ctx)
// remember the path from the reference

View File

@@ -81,6 +81,8 @@ func (s *OcisSession) executantUser() *userpb.User {
// Purge deletes the upload session metadata and written binary data
func (s *OcisSession) Purge(ctx context.Context) error {
_, span := tracer.Start(ctx, "Purge")
defer span.End()
sessionPath := sessionPath(s.store.root, s.info.ID)
f, err := lockedfile.OpenFile(sessionPath+".lock", os.O_RDWR|os.O_TRUNC|os.O_CREATE, 0600)
if err != nil {
@@ -112,6 +114,8 @@ func (s *OcisSession) TouchBin() error {
// events can update the scan outcome and the finished event might read an empty file because of race conditions
// so we need to lock the file while writing and use atomic writes
func (s *OcisSession) Persist(ctx context.Context) error {
_, span := tracer.Start(ctx, "Persist")
defer span.End()
sessionPath := sessionPath(s.store.root, s.info.ID)
// create folder structure (if needed)
if err := os.MkdirAll(filepath.Dir(sessionPath), 0700); err != nil {

View File

@@ -199,8 +199,8 @@ func (store OcisStore) Cleanup(ctx context.Context, session Session, revertNodeM
// CreateNodeForUpload will create the target node for the Upload
// TODO move this to the node package as NodeFromUpload?
// should we in InitiateUpload create the node first? and then the upload?
func (store OcisStore) CreateNodeForUpload(session *OcisSession, initAttrs node.Attributes) (*node.Node, error) {
ctx, span := tracer.Start(session.Context(context.Background()), "CreateNodeForUpload")
func (store OcisStore) CreateNodeForUpload(ctx context.Context, session *OcisSession, initAttrs node.Attributes) (*node.Node, error) {
ctx, span := tracer.Start(session.Context(ctx), "CreateNodeForUpload")
defer span.End()
n := node.New(
session.SpaceID(),
@@ -303,6 +303,8 @@ func (store OcisStore) CreateNodeForUpload(session *OcisSession, initAttrs node.
}
func (store OcisStore) updateExistingNode(ctx context.Context, session *OcisSession, n *node.Node, spaceID string, fsize uint64) (metadata.UnlockFunc, error) {
_, span := tracer.Start(ctx, "updateExistingNode")
defer span.End()
targetPath := n.InternalPath()
// write lock existing node before reading any metadata
@@ -388,6 +390,7 @@ func (store OcisStore) updateExistingNode(ctx context.Context, session *OcisSess
}
// clean revision file
span.AddEvent("os.Create")
if _, err := os.Create(versionPath); err != nil {
return unlock, err
}
@@ -405,6 +408,7 @@ func (store OcisStore) updateExistingNode(ctx context.Context, session *OcisSess
}
session.info.MetaData["versionsPath"] = versionPath
// keep mtime from previous version
span.AddEvent("os.Chtimes")
if err := os.Chtimes(session.info.MetaData["versionsPath"], oldNodeMtime, oldNodeMtime); err != nil {
return unlock, errtypes.InternalError(fmt.Sprintf("failed to change mtime of version node: %s", err))
}

View File

@@ -184,7 +184,7 @@ func (session *OcisSession) FinishUploadDecomposed(ctx context.Context) error {
}
}
n, err := session.store.CreateNodeForUpload(session, attrs)
n, err := session.store.CreateNodeForUpload(ctx, session, attrs)
if err != nil {
return err
}
@@ -226,7 +226,7 @@ func (session *OcisSession) FinishUploadDecomposed(ctx context.Context) error {
// for 0-byte uploads we take a shortcut and finalize isn't called elsewhere
if !session.store.async || session.info.Size == 0 {
// handle postprocessing synchronously
err = session.Finalize()
err = session.Finalize(ctx)
session.store.Cleanup(ctx, session, err != nil, false, err == nil)
if err != nil {
log.Error().Err(err).Msg("failed to upload")
@@ -279,8 +279,8 @@ func (session *OcisSession) ConcatUploads(_ context.Context, uploads []tusd.Uplo
}
// Finalize finalizes the upload (eg moves the file to the internal destination)
func (session *OcisSession) Finalize() (err error) {
ctx, span := tracer.Start(session.Context(context.Background()), "Finalize")
func (session *OcisSession) Finalize(ctx context.Context) (err error) {
ctx, span := tracer.Start(session.Context(ctx), "Finalize")
defer span.End()
revisionNode := node.New(session.SpaceID(), session.NodeID(), "", "", session.Size(), session.ID(),

View File

@@ -552,7 +552,11 @@ func (cs3 *CS3) getAuthContext(ctx context.Context) (context.Context, error) {
authCtx, span := tracer.Start(authCtx, "getAuthContext", trace.WithLinks(trace.LinkFromContext(ctx)))
defer span.End()
client, err := pool.GetGatewayServiceClient(cs3.gatewayAddr)
selector, err := pool.GatewaySelector(cs3.gatewayAddr)
if err != nil {
return nil, err
}
client, err := selector.Next()
if err != nil {
return nil, err
}

2
vendor/modules.txt vendored
View File

@@ -367,7 +367,7 @@ github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1
github.com/cs3org/go-cs3apis/cs3/storage/registry/v1beta1
github.com/cs3org/go-cs3apis/cs3/tx/v1beta1
github.com/cs3org/go-cs3apis/cs3/types/v1beta1
# github.com/cs3org/reva/v2 v2.26.7
# github.com/cs3org/reva/v2 v2.26.8-0.20241203081301-17f339546533
## explicit; go 1.22.0
github.com/cs3org/reva/v2/cmd/revad/internal/grace
github.com/cs3org/reva/v2/cmd/revad/runtime