mirror of
https://github.com/opencloud-eu/opencloud.git
synced 2025-12-31 01:10:20 -06:00
Bump reva
This commit is contained in:
2
go.mod
2
go.mod
@@ -17,7 +17,7 @@ require (
|
||||
github.com/cenkalti/backoff v2.2.1+incompatible
|
||||
github.com/coreos/go-oidc/v3 v3.11.0
|
||||
github.com/cs3org/go-cs3apis v0.0.0-20241105092511-3ad35d174fc1
|
||||
github.com/cs3org/reva/v2 v2.26.7
|
||||
github.com/cs3org/reva/v2 v2.26.8-0.20241203081301-17f339546533
|
||||
github.com/davidbyttow/govips/v2 v2.15.0
|
||||
github.com/dhowden/tag v0.0.0-20240417053706-3d75831295e8
|
||||
github.com/dutchcoders/go-clamd v0.0.0-20170520113014-b970184f4d9e
|
||||
|
||||
2
go.sum
2
go.sum
@@ -257,6 +257,8 @@ github.com/cs3org/go-cs3apis v0.0.0-20241105092511-3ad35d174fc1 h1:RU6LT6mkD16xZ
|
||||
github.com/cs3org/go-cs3apis v0.0.0-20241105092511-3ad35d174fc1/go.mod h1:DedpcqXl193qF/08Y04IO0PpxyyMu8+GrkD6kWK2MEQ=
|
||||
github.com/cs3org/reva/v2 v2.26.7 h1:E5b1+H5ZsnmDgWWS/u3t4PtdmiMaY1bEEYVI/vE9xo8=
|
||||
github.com/cs3org/reva/v2 v2.26.7/go.mod h1:xC5N2XOrCRim/W55uyMsew8RwwFZbQ4hIaKshIbyToo=
|
||||
github.com/cs3org/reva/v2 v2.26.8-0.20241203081301-17f339546533 h1:QshDjljk44ASolJwlHxE9e7u+Slgdi/VfPKYvbfFu2g=
|
||||
github.com/cs3org/reva/v2 v2.26.8-0.20241203081301-17f339546533/go.mod h1:fJWmn7EkttWOWphZfiKdFOcHuthcUsU55aSN1VeTOhU=
|
||||
github.com/cyberdelia/templates v0.0.0-20141128023046-ca7fffd4298c/go.mod h1:GyV+0YP4qX0UQ7r2MoYZ+AvYDp12OF5yg4q8rGnyNh4=
|
||||
github.com/cyphar/filepath-securejoin v0.2.4 h1:Ugdm7cg7i6ZK6x3xDF1oEu1nfkyfH53EtKeQYTC3kyg=
|
||||
github.com/cyphar/filepath-securejoin v0.2.4/go.mod h1:aPGpWjXOXUn2NCNjFvBE6aRxGGx79pTxQpKOJNYHHl4=
|
||||
|
||||
@@ -520,12 +520,7 @@ func (s *service) UpdateReceivedShare(ctx context.Context, req *collaboration.Up
|
||||
isMountPointSet := slices.Contains(req.GetUpdateMask().GetPaths(), _fieldMaskPathMountPoint) && req.GetShare().GetMountPoint().GetPath() != ""
|
||||
// we calculate a valid mountpoint only if the share should be accepted and the mount point is not set explicitly
|
||||
if isStateTransitionShareAccepted && !isMountPointSet {
|
||||
gatewayClient, err := s.gatewaySelector.Next()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
s, err := setReceivedShareMountPoint(ctx, gatewayClient, req)
|
||||
s, err := s.setReceivedShareMountPoint(ctx, req)
|
||||
switch {
|
||||
case err != nil:
|
||||
fallthrough
|
||||
@@ -556,7 +551,11 @@ func (s *service) UpdateReceivedShare(ctx context.Context, req *collaboration.Up
|
||||
}
|
||||
}
|
||||
|
||||
func setReceivedShareMountPoint(ctx context.Context, gwc gateway.GatewayAPIClient, req *collaboration.UpdateReceivedShareRequest) (*rpc.Status, error) {
|
||||
func (s *service) setReceivedShareMountPoint(ctx context.Context, req *collaboration.UpdateReceivedShareRequest) (*rpc.Status, error) {
|
||||
gwc, err := s.gatewaySelector.Next()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
receivedShare, err := gwc.GetReceivedShare(ctx, &collaboration.GetReceivedShareRequest{
|
||||
Ref: &collaboration.ShareReference{
|
||||
Spec: &collaboration.ShareReference_Id{
|
||||
@@ -575,6 +574,10 @@ func setReceivedShareMountPoint(ctx context.Context, gwc gateway.GatewayAPIClien
|
||||
return status.NewOK(ctx), nil
|
||||
}
|
||||
|
||||
gwc, err = s.gatewaySelector.Next()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
resourceStat, err := gwc.Stat(ctx, &provider.StatRequest{
|
||||
Ref: &provider.Reference{
|
||||
ResourceId: receivedShare.GetShare().GetShare().GetResourceId(),
|
||||
@@ -592,11 +595,15 @@ func setReceivedShareMountPoint(ctx context.Context, gwc gateway.GatewayAPIClien
|
||||
var userID *userpb.UserId
|
||||
_ = utils.ReadJSONFromOpaque(req.Opaque, "userid", &userID)
|
||||
|
||||
receivedShares, err := s.sm.ListReceivedShares(ctx, []*collaboration.Filter{}, userID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// check if the requested mount point is available and if not, find a suitable one
|
||||
availableMountpoint, _, err := GetMountpointAndUnmountedShares(ctx, gwc,
|
||||
availableMountpoint, _, err := getMountpointAndUnmountedShares(ctx, receivedShares, s.gatewaySelector, nil,
|
||||
resourceStat.GetInfo().GetId(),
|
||||
resourceStat.GetInfo().GetName(),
|
||||
userID,
|
||||
)
|
||||
if err != nil {
|
||||
return status.NewInternal(ctx, err.Error()), nil
|
||||
@@ -620,7 +627,6 @@ func GetMountpointAndUnmountedShares(ctx context.Context, gwc gateway.GatewayAPI
|
||||
if userId != nil {
|
||||
listReceivedSharesReq.Opaque = utils.AppendJSONToOpaque(nil, "userid", userId)
|
||||
}
|
||||
|
||||
listReceivedSharesRes, err := gwc.ListReceivedShares(ctx, listReceivedSharesReq)
|
||||
if err != nil {
|
||||
return "", nil, errtypes.InternalError("grpc list received shares request failed")
|
||||
@@ -630,17 +636,30 @@ func GetMountpointAndUnmountedShares(ctx context.Context, gwc gateway.GatewayAPI
|
||||
return "", nil, err
|
||||
}
|
||||
|
||||
return getMountpointAndUnmountedShares(ctx, listReceivedSharesRes.GetShares(), nil, gwc, id, name)
|
||||
}
|
||||
|
||||
// GetMountpointAndUnmountedShares returns a new or existing mountpoint for the given info and produces a list of unmounted received shares for the same resource
|
||||
func getMountpointAndUnmountedShares(ctx context.Context, receivedShares []*collaboration.ReceivedShare, gatewaySelector pool.Selectable[gateway.GatewayAPIClient], gwc gateway.GatewayAPIClient, id *provider.ResourceId, name string) (string, []*collaboration.ReceivedShare, error) {
|
||||
|
||||
unmountedShares := []*collaboration.ReceivedShare{}
|
||||
base := filepath.Clean(name)
|
||||
mount := base
|
||||
existingMountpoint := ""
|
||||
mountedShares := make([]string, 0, len(listReceivedSharesRes.GetShares()))
|
||||
mountedShares := make([]string, 0, len(receivedShares))
|
||||
var pathExists bool
|
||||
var err error
|
||||
|
||||
for _, s := range listReceivedSharesRes.GetShares() {
|
||||
for _, s := range receivedShares {
|
||||
resourceIDEqual := utils.ResourceIDEqual(s.GetShare().GetResourceId(), id)
|
||||
|
||||
if resourceIDEqual && s.State == collaboration.ShareState_SHARE_STATE_ACCEPTED {
|
||||
if gatewaySelector != nil {
|
||||
gwc, err = gatewaySelector.Next()
|
||||
if err != nil {
|
||||
return "", nil, err
|
||||
}
|
||||
}
|
||||
// a share to the resource already exists and is mounted, remembers the mount point
|
||||
_, err := utils.GetResourceByID(ctx, s.GetShare().GetResourceId(), gwc)
|
||||
if err == nil {
|
||||
@@ -658,6 +677,12 @@ func GetMountpointAndUnmountedShares(ctx context.Context, gwc gateway.GatewayAPI
|
||||
mountedShares = append(mountedShares, s.GetMountPoint().GetPath())
|
||||
if s.GetMountPoint().GetPath() == mount {
|
||||
// does the shared resource still exist?
|
||||
if gatewaySelector != nil {
|
||||
gwc, err = gatewaySelector.Next()
|
||||
if err != nil {
|
||||
return "", nil, err
|
||||
}
|
||||
}
|
||||
_, err := utils.GetResourceByID(ctx, s.GetShare().GetResourceId(), gwc)
|
||||
if err == nil {
|
||||
pathExists = true
|
||||
|
||||
2
vendor/github.com/cs3org/reva/v2/internal/http/services/owncloud/ocdav/proppatch.go
generated
vendored
2
vendor/github.com/cs3org/reva/v2/internal/http/services/owncloud/ocdav/proppatch.go
generated
vendored
@@ -133,12 +133,14 @@ func (s *svc) handleProppatch(ctx context.Context, w http.ResponseWriter, r *htt
|
||||
rreq := &provider.UnsetArbitraryMetadataRequest{
|
||||
Ref: ref,
|
||||
ArbitraryMetadataKeys: []string{""},
|
||||
LockId: requestLockToken(r),
|
||||
}
|
||||
sreq := &provider.SetArbitraryMetadataRequest{
|
||||
Ref: ref,
|
||||
ArbitraryMetadata: &provider.ArbitraryMetadata{
|
||||
Metadata: map[string]string{},
|
||||
},
|
||||
LockId: requestLockToken(r),
|
||||
}
|
||||
|
||||
acceptedProps := []xml.Name{}
|
||||
|
||||
156
vendor/github.com/cs3org/reva/v2/pkg/share/manager/jsoncs3/jsoncs3.go
generated
vendored
156
vendor/github.com/cs3org/reva/v2/pkg/share/manager/jsoncs3/jsoncs3.go
generated
vendored
@@ -34,6 +34,7 @@ import (
|
||||
"github.com/cs3org/reva/v2/pkg/errtypes"
|
||||
"github.com/cs3org/reva/v2/pkg/events"
|
||||
"github.com/cs3org/reva/v2/pkg/events/stream"
|
||||
"github.com/cs3org/reva/v2/pkg/logger"
|
||||
"github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool"
|
||||
"github.com/cs3org/reva/v2/pkg/share"
|
||||
"github.com/cs3org/reva/v2/pkg/share/manager/jsoncs3/providercache"
|
||||
@@ -114,6 +115,12 @@ func init() {
|
||||
registry.Register("jsoncs3", NewDefault)
|
||||
}
|
||||
|
||||
var (
|
||||
_registeredEvents = []events.Unmarshaller{
|
||||
events.SpaceDeleted{},
|
||||
}
|
||||
)
|
||||
|
||||
type config struct {
|
||||
GatewayAddr string `mapstructure:"gateway_addr"`
|
||||
MaxConcurrency int `mapstructure:"max_concurrency"`
|
||||
@@ -188,7 +195,8 @@ func NewDefault(m map[string]interface{}) (share.Manager, error) {
|
||||
// New returns a new manager instance.
|
||||
func New(s metadata.Storage, gatewaySelector pool.Selectable[gatewayv1beta1.GatewayAPIClient], ttlSeconds int, es events.Stream, maxconcurrency int) (*Manager, error) {
|
||||
ttl := time.Duration(ttlSeconds) * time.Second
|
||||
return &Manager{
|
||||
|
||||
m := &Manager{
|
||||
Cache: providercache.New(s, ttl),
|
||||
CreatedCache: sharecache.New(s, "users", "created.json", ttl),
|
||||
UserReceivedStates: receivedsharecache.New(s, ttl),
|
||||
@@ -197,7 +205,18 @@ func New(s metadata.Storage, gatewaySelector pool.Selectable[gatewayv1beta1.Gate
|
||||
gatewaySelector: gatewaySelector,
|
||||
eventStream: es,
|
||||
MaxConcurrency: maxconcurrency,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// listen for events
|
||||
if m.eventStream != nil {
|
||||
ch, err := events.Consume(m.eventStream, "jsoncs3sharemanager", _registeredEvents...)
|
||||
if err != nil {
|
||||
appctx.GetLogger(context.Background()).Error().Err(err).Msg("error consuming events")
|
||||
}
|
||||
go m.ProcessEvents(ch)
|
||||
}
|
||||
|
||||
return m, nil
|
||||
}
|
||||
|
||||
func (m *Manager) initialize(ctx context.Context) error {
|
||||
@@ -248,6 +267,22 @@ func (m *Manager) initialize(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *Manager) ProcessEvents(ch <-chan events.Event) {
|
||||
log := logger.New()
|
||||
for event := range ch {
|
||||
ctx := context.Background()
|
||||
|
||||
if err := m.initialize(ctx); err != nil {
|
||||
log.Error().Err(err).Msg("error initializing manager")
|
||||
}
|
||||
|
||||
if ev, ok := event.Event.(events.SpaceDeleted); ok {
|
||||
log.Debug().Msgf("space deleted event: %v", ev)
|
||||
go func() { m.purgeSpace(ctx, ev.ID) }()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Share creates a new share
|
||||
func (m *Manager) Share(ctx context.Context, md *provider.ResourceInfo, g *collaboration.ShareGrant) (*collaboration.Share, error) {
|
||||
ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "Share")
|
||||
@@ -420,7 +455,7 @@ func (m *Manager) GetShare(ctx context.Context, ref *collaboration.ShareReferenc
|
||||
return nil, err
|
||||
}
|
||||
if share.IsExpired(s) {
|
||||
if err := m.removeShare(ctx, s); err != nil {
|
||||
if err := m.removeShare(ctx, s, false); err != nil {
|
||||
sublog.Error().Err(err).
|
||||
Msg("failed to unshare expired share")
|
||||
}
|
||||
@@ -485,7 +520,7 @@ func (m *Manager) Unshare(ctx context.Context, ref *collaboration.ShareReference
|
||||
return errtypes.NotFound(ref.String())
|
||||
}
|
||||
|
||||
return m.removeShare(ctx, s)
|
||||
return m.removeShare(ctx, s, false)
|
||||
}
|
||||
|
||||
// UpdateShare updates the mode of the given share.
|
||||
@@ -622,7 +657,7 @@ func (m *Manager) listSharesByIDs(ctx context.Context, user *userv1beta1.User, f
|
||||
resourceID := s.GetResourceId()
|
||||
sublog = sublog.With().Str("storageid", resourceID.GetStorageId()).Str("spaceid", resourceID.GetSpaceId()).Str("opaqueid", resourceID.GetOpaqueId()).Logger()
|
||||
if share.IsExpired(s) {
|
||||
if err := m.removeShare(ctx, s); err != nil {
|
||||
if err := m.removeShare(ctx, s, false); err != nil {
|
||||
sublog.Error().Err(err).
|
||||
Msg("failed to unshare expired share")
|
||||
}
|
||||
@@ -740,7 +775,7 @@ func (m *Manager) listCreatedShares(ctx context.Context, user *userv1beta1.User,
|
||||
continue
|
||||
}
|
||||
if share.IsExpired(s) {
|
||||
if err := m.removeShare(ctx, s); err != nil {
|
||||
if err := m.removeShare(ctx, s, false); err != nil {
|
||||
sublog.Error().Err(err).
|
||||
Msg("failed to unshare expired share")
|
||||
}
|
||||
@@ -901,12 +936,18 @@ func (m *Manager) ListReceivedShares(ctx context.Context, filters []*collaborati
|
||||
}
|
||||
for shareID, state := range w.rspace.States {
|
||||
s, err := m.Cache.Get(ctx, storageID, spaceID, shareID, true)
|
||||
if err != nil || s == nil {
|
||||
if err != nil {
|
||||
sublogr.Error().Err(err).Msg("could not retrieve share")
|
||||
continue
|
||||
}
|
||||
if s == nil {
|
||||
sublogr.Warn().Str("shareid", shareID).Msg("share not found. cleaning up")
|
||||
_ = m.UserReceivedStates.Remove(ctx, user.Id.OpaqueId, w.ssid, shareID)
|
||||
continue
|
||||
}
|
||||
sublogr = sublogr.With().Str("shareid", shareID).Logger()
|
||||
if share.IsExpired(s) {
|
||||
if err := m.removeShare(ctx, s); err != nil {
|
||||
if err := m.removeShare(ctx, s, false); err != nil {
|
||||
sublogr.Error().Err(err).
|
||||
Msg("failed to unshare expired share")
|
||||
}
|
||||
@@ -1009,7 +1050,7 @@ func (m *Manager) getReceived(ctx context.Context, ref *collaboration.ShareRefer
|
||||
return nil, errtypes.NotFound(ref.String())
|
||||
}
|
||||
if share.IsExpired(s) {
|
||||
if err := m.removeShare(ctx, s); err != nil {
|
||||
if err := m.removeShare(ctx, s, false); err != nil {
|
||||
sublog.Error().Err(err).
|
||||
Msg("failed to unshare expired share")
|
||||
}
|
||||
@@ -1136,24 +1177,107 @@ func (m *Manager) Load(ctx context.Context, shareChan <-chan *collaboration.Shar
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *Manager) removeShare(ctx context.Context, s *collaboration.Share) error {
|
||||
func (m *Manager) purgeSpace(ctx context.Context, id *provider.StorageSpaceId) {
|
||||
log := appctx.GetLogger(ctx)
|
||||
storageID, spaceID := storagespace.SplitStorageID(id.OpaqueId)
|
||||
|
||||
shares, err := m.Cache.ListSpace(ctx, storageID, spaceID)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("error listing shares in space")
|
||||
return
|
||||
}
|
||||
|
||||
// iterate over all shares in the space and remove them
|
||||
for _, share := range shares.Shares {
|
||||
err := m.removeShare(ctx, share, true)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("error removing share")
|
||||
}
|
||||
}
|
||||
|
||||
// remove all shares in the space
|
||||
err = m.Cache.PurgeSpace(ctx, storageID, spaceID)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("error purging space")
|
||||
}
|
||||
}
|
||||
|
||||
func (m *Manager) removeShare(ctx context.Context, s *collaboration.Share, skipSpaceCache bool) error {
|
||||
ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "removeShare")
|
||||
defer span.End()
|
||||
|
||||
eg, ctx := errgroup.WithContext(ctx)
|
||||
eg.Go(func() error {
|
||||
storageID, spaceID, _ := shareid.Decode(s.Id.OpaqueId)
|
||||
err := m.Cache.Remove(ctx, storageID, spaceID, s.Id.OpaqueId)
|
||||
if !skipSpaceCache {
|
||||
eg.Go(func() error {
|
||||
storageID, spaceID, _ := shareid.Decode(s.Id.OpaqueId)
|
||||
err := m.Cache.Remove(ctx, storageID, spaceID, s.Id.OpaqueId)
|
||||
|
||||
return err
|
||||
})
|
||||
return err
|
||||
})
|
||||
}
|
||||
|
||||
eg.Go(func() error {
|
||||
// remove from created cache
|
||||
return m.CreatedCache.Remove(ctx, s.GetCreator().GetOpaqueId(), s.Id.OpaqueId)
|
||||
})
|
||||
|
||||
// TODO remove from grantee cache
|
||||
eg.Go(func() error {
|
||||
// remove from user received states
|
||||
if s.GetGrantee().Type == provider.GranteeType_GRANTEE_TYPE_USER {
|
||||
return m.UserReceivedStates.Remove(ctx, s.GetGrantee().GetUserId().GetOpaqueId(), s.GetResourceId().GetStorageId()+shareid.IDDelimiter+s.GetResourceId().GetSpaceId(), s.Id.OpaqueId)
|
||||
} else if s.GetGrantee().Type == provider.GranteeType_GRANTEE_TYPE_GROUP {
|
||||
return m.GroupReceivedCache.Remove(ctx, s.GetGrantee().GetGroupId().GetOpaqueId(), s.Id.OpaqueId)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
return eg.Wait()
|
||||
}
|
||||
|
||||
func (m *Manager) CleanupStaleShares(ctx context.Context) {
|
||||
log := appctx.GetLogger(ctx)
|
||||
|
||||
if err := m.initialize(ctx); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// list all shares
|
||||
providers, err := m.Cache.All(ctx)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("error listing all shares")
|
||||
return
|
||||
}
|
||||
|
||||
client, err := m.gatewaySelector.Next()
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("could not get gateway client")
|
||||
}
|
||||
|
||||
providers.Range(func(storage string, spaces *providercache.Spaces) bool {
|
||||
log.Info().Str("storage", storage).Interface("spaceCount", spaces.Spaces.Count()).Msg("checking storage")
|
||||
|
||||
spaces.Spaces.Range(func(space string, shares *providercache.Shares) bool {
|
||||
log.Info().Str("storage", storage).Str("space", space).Interface("shareCount", len(shares.Shares)).Msg("checking space")
|
||||
|
||||
for _, s := range shares.Shares {
|
||||
req := &provider.StatRequest{
|
||||
Ref: &provider.Reference{ResourceId: s.ResourceId, Path: "."},
|
||||
}
|
||||
res, err := client.Stat(ctx, req)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Str("storage", storage).Str("space", space).Msg("could not stat shared resource")
|
||||
}
|
||||
if res.Status.Code == rpcv1beta1.Code_CODE_NOT_FOUND {
|
||||
log.Info().Str("storage", storage).Str("space", space).Msg("shared resource does not exist anymore. cleaning up shares")
|
||||
if err := m.removeShare(ctx, s, false); err != nil {
|
||||
log.Error().Err(err).Str("storage", storage).Str("space", space).Msg("could not remove share")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return true
|
||||
})
|
||||
|
||||
return true
|
||||
})
|
||||
}
|
||||
|
||||
@@ -25,6 +25,7 @@ import (
|
||||
"os"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@@ -319,6 +320,36 @@ func (c *Cache) Get(ctx context.Context, storageID, spaceID, shareID string, ski
|
||||
return space.Shares[shareID], nil
|
||||
}
|
||||
|
||||
// All returns all entries in the storage
|
||||
func (c *Cache) All(ctx context.Context) (*mtimesyncedcache.Map[string, *Spaces], error) {
|
||||
ctx, span := tracer.Start(ctx, "All")
|
||||
defer span.End()
|
||||
|
||||
providers, err := c.storage.ListDir(ctx, "/storages")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for _, provider := range providers {
|
||||
storageID := provider.Name
|
||||
spaces, err := c.storage.ListDir(ctx, path.Join("/storages", storageID))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for _, space := range spaces {
|
||||
spaceID := strings.TrimSuffix(space.Name, ".json")
|
||||
|
||||
unlock := c.LockSpace(spaceID)
|
||||
span.AddEvent("got lock for space " + spaceID)
|
||||
if err := c.syncWithLock(ctx, storageID, spaceID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
unlock()
|
||||
}
|
||||
}
|
||||
|
||||
return &c.Providers, nil
|
||||
}
|
||||
|
||||
// ListSpace returns the list of shares in a given space
|
||||
func (c *Cache) ListSpace(ctx context.Context, storageID, spaceID string) (*Shares, error) {
|
||||
ctx, span := tracer.Start(ctx, "ListSpace")
|
||||
@@ -418,6 +449,35 @@ func (c *Cache) Persist(ctx context.Context, storageID, spaceID string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// PurgeSpace removes a space from the cache
|
||||
func (c *Cache) PurgeSpace(ctx context.Context, storageID, spaceID string) error {
|
||||
ctx, span := tracer.Start(ctx, "PurgeSpace")
|
||||
defer span.End()
|
||||
|
||||
unlock := c.LockSpace(spaceID)
|
||||
defer unlock()
|
||||
span.AddEvent("got lock")
|
||||
|
||||
if !c.isSpaceCached(storageID, spaceID) {
|
||||
err := c.syncWithLock(ctx, storageID, spaceID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
spaces, ok := c.Providers.Load(storageID)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
newShares := &Shares{}
|
||||
if space, ok := spaces.Spaces.Load(spaceID); ok {
|
||||
newShares.Etag = space.Etag // keep the etag to allow overwriting the state on the server
|
||||
}
|
||||
spaces.Spaces.Store(spaceID, newShares)
|
||||
|
||||
return c.Persist(ctx, storageID, spaceID)
|
||||
}
|
||||
|
||||
func (c *Cache) syncWithLock(ctx context.Context, storageID, spaceID string) error {
|
||||
ctx, span := tracer.Start(ctx, "syncWithLock")
|
||||
defer span.End()
|
||||
|
||||
@@ -185,6 +185,74 @@ func (c *Cache) Get(ctx context.Context, userID, spaceID, shareID string) (*Stat
|
||||
return rss.Spaces[spaceID].States[shareID], nil
|
||||
}
|
||||
|
||||
// Remove removes an entry from the cache
|
||||
func (c *Cache) Remove(ctx context.Context, userID, spaceID, shareID string) error {
|
||||
ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "Grab lock")
|
||||
unlock := c.lockUser(userID)
|
||||
span.End()
|
||||
span.SetAttributes(attribute.String("cs3.userid", userID))
|
||||
defer unlock()
|
||||
|
||||
ctx, span = appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "Add")
|
||||
defer span.End()
|
||||
span.SetAttributes(attribute.String("cs3.userid", userID), attribute.String("cs3.spaceid", spaceID))
|
||||
|
||||
persistFunc := func() error {
|
||||
c.initializeIfNeeded(userID, spaceID)
|
||||
|
||||
rss, _ := c.ReceivedSpaces.Load(userID)
|
||||
receivedSpace := rss.Spaces[spaceID]
|
||||
if receivedSpace.States == nil {
|
||||
receivedSpace.States = map[string]*State{}
|
||||
}
|
||||
delete(receivedSpace.States, shareID)
|
||||
if len(receivedSpace.States) == 0 {
|
||||
delete(rss.Spaces, spaceID)
|
||||
}
|
||||
|
||||
return c.persist(ctx, userID)
|
||||
}
|
||||
|
||||
log := appctx.GetLogger(ctx).With().
|
||||
Str("hostname", os.Getenv("HOSTNAME")).
|
||||
Str("userID", userID).
|
||||
Str("spaceID", spaceID).Logger()
|
||||
|
||||
var err error
|
||||
for retries := 100; retries > 0; retries-- {
|
||||
err = persistFunc()
|
||||
switch err.(type) {
|
||||
case nil:
|
||||
span.SetStatus(codes.Ok, "")
|
||||
return nil
|
||||
case errtypes.Aborted:
|
||||
log.Debug().Msg("aborted when persisting added received share: etag changed. retrying...")
|
||||
// this is the expected status code from the server when the if-match etag check fails
|
||||
// continue with sync below
|
||||
case errtypes.PreconditionFailed:
|
||||
log.Debug().Msg("precondition failed when persisting added received share: etag changed. retrying...")
|
||||
// actually, this is the wrong status code and we treat it like errtypes.Aborted because of inconsistencies on the server side
|
||||
// continue with sync below
|
||||
case errtypes.AlreadyExists:
|
||||
log.Debug().Msg("already exists when persisting added received share. retrying...")
|
||||
// CS3 uses an already exists error instead of precondition failed when using an If-None-Match=* header / IfExists flag in the InitiateFileUpload call.
|
||||
// Thas happens when the cache thinks there is no file.
|
||||
// continue with sync below
|
||||
default:
|
||||
span.SetStatus(codes.Error, fmt.Sprintf("persisting added received share failed. giving up: %s", err.Error()))
|
||||
log.Error().Err(err).Msg("persisting added received share failed")
|
||||
return err
|
||||
}
|
||||
if err := c.syncWithLock(ctx, userID); err != nil {
|
||||
span.RecordError(err)
|
||||
span.SetStatus(codes.Error, err.Error())
|
||||
log.Error().Err(err).Msg("persisting added received share failed. giving up.")
|
||||
return err
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// List returns a list of received shares for a given user
|
||||
// The return list is guaranteed to be thread-safe
|
||||
func (c *Cache) List(ctx context.Context, userID string) (map[string]*Space, error) {
|
||||
|
||||
5
vendor/github.com/cs3org/reva/v2/pkg/share/manager/jsoncs3/sharecache/sharecache.go
generated
vendored
5
vendor/github.com/cs3org/reva/v2/pkg/share/manager/jsoncs3/sharecache/sharecache.go
generated
vendored
@@ -214,6 +214,11 @@ func (c *Cache) Remove(ctx context.Context, userid, shareID string) error {
|
||||
log.Debug().Msg("precondition failed when persisting removed share: etag changed. retrying...")
|
||||
// actually, this is the wrong status code and we treat it like errtypes.Aborted because of inconsistencies on the server side
|
||||
// continue with sync below
|
||||
case errtypes.AlreadyExists:
|
||||
log.Debug().Msg("file already existed when persisting removed share. retrying...")
|
||||
// CS3 uses an already exists error instead of precondition failed when using an If-None-Match=* header / IfExists flag in the InitiateFileUpload call.
|
||||
// Thas happens when the cache thinks there is no file.
|
||||
// continue with sync below
|
||||
default:
|
||||
span.SetStatus(codes.Error, fmt.Sprintf("persisting removed share failed. giving up: %s", err.Error()))
|
||||
log.Error().Err(err).Msg("persisting removed share failed")
|
||||
|
||||
4
vendor/github.com/cs3org/reva/v2/pkg/storage/fs/ocis/ocis.go
generated
vendored
4
vendor/github.com/cs3org/reva/v2/pkg/storage/fs/ocis/ocis.go
generated
vendored
@@ -36,7 +36,7 @@ func init() {
|
||||
|
||||
// New returns an implementation to of the storage.FS interface that talk to
|
||||
// a local filesystem.
|
||||
func New(m map[string]interface{}, stream events.Stream, _ *zerolog.Logger) (storage.FS, error) {
|
||||
func New(m map[string]interface{}, stream events.Stream, log *zerolog.Logger) (storage.FS, error) {
|
||||
o, err := options.New(m)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -47,5 +47,5 @@ func New(m map[string]interface{}, stream events.Stream, _ *zerolog.Logger) (sto
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return decomposedfs.NewDefault(m, bs, stream)
|
||||
return decomposedfs.NewDefault(m, bs, stream, log)
|
||||
}
|
||||
|
||||
2
vendor/github.com/cs3org/reva/v2/pkg/storage/fs/posix/posix.go
generated
vendored
2
vendor/github.com/cs3org/reva/v2/pkg/storage/fs/posix/posix.go
generated
vendored
@@ -134,7 +134,7 @@ func New(m map[string]interface{}, stream events.Stream, log *zerolog.Logger) (s
|
||||
Trashbin: trashbin,
|
||||
}
|
||||
|
||||
dfs, err := decomposedfs.New(&o.Options, aspects)
|
||||
dfs, err := decomposedfs.New(&o.Options, aspects, log)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
2
vendor/github.com/cs3org/reva/v2/pkg/storage/fs/posix/tree/tree.go
generated
vendored
2
vendor/github.com/cs3org/reva/v2/pkg/storage/fs/posix/tree/tree.go
generated
vendored
@@ -704,6 +704,8 @@ func (t *Tree) ResolveSpaceIDIndexEntry(spaceid, entry string) (string, string,
|
||||
|
||||
// InitNewNode initializes a new node
|
||||
func (t *Tree) InitNewNode(ctx context.Context, n *node.Node, fsize uint64) (metadata.UnlockFunc, error) {
|
||||
_, span := tracer.Start(ctx, "InitNewNode")
|
||||
defer span.End()
|
||||
// create folder structure (if needed)
|
||||
if err := os.MkdirAll(filepath.Dir(n.InternalPath()), 0700); err != nil {
|
||||
return nil, err
|
||||
|
||||
4
vendor/github.com/cs3org/reva/v2/pkg/storage/fs/s3ng/s3ng.go
generated
vendored
4
vendor/github.com/cs3org/reva/v2/pkg/storage/fs/s3ng/s3ng.go
generated
vendored
@@ -35,7 +35,7 @@ func init() {
|
||||
|
||||
// New returns an implementation to of the storage.FS interface that talk to
|
||||
// a local filesystem.
|
||||
func New(m map[string]interface{}, stream events.Stream, _ *zerolog.Logger) (storage.FS, error) {
|
||||
func New(m map[string]interface{}, stream events.Stream, log *zerolog.Logger) (storage.FS, error) {
|
||||
o, err := parseConfig(m)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -59,5 +59,5 @@ func New(m map[string]interface{}, stream events.Stream, _ *zerolog.Logger) (sto
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return decomposedfs.NewDefault(m, bs, stream)
|
||||
return decomposedfs.NewDefault(m, bs, stream, log)
|
||||
}
|
||||
|
||||
14
vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/decomposedfs.go
generated
vendored
14
vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/decomposedfs.go
generated
vendored
@@ -35,6 +35,7 @@ import (
|
||||
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
|
||||
"github.com/jellydator/ttlcache/v2"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/rs/zerolog"
|
||||
tusd "github.com/tus/tusd/v2/pkg/handler"
|
||||
microstore "go-micro.dev/v4/store"
|
||||
"go.opentelemetry.io/otel"
|
||||
@@ -125,10 +126,12 @@ type Decomposedfs struct {
|
||||
userSpaceIndex *spaceidindex.Index
|
||||
groupSpaceIndex *spaceidindex.Index
|
||||
spaceTypeIndex *spaceidindex.Index
|
||||
|
||||
log *zerolog.Logger
|
||||
}
|
||||
|
||||
// NewDefault returns an instance with default components
|
||||
func NewDefault(m map[string]interface{}, bs tree.Blobstore, es events.Stream) (storage.FS, error) {
|
||||
func NewDefault(m map[string]interface{}, bs tree.Blobstore, es events.Stream, log *zerolog.Logger) (storage.FS, error) {
|
||||
o, err := options.New(m)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -169,14 +172,12 @@ func NewDefault(m map[string]interface{}, bs tree.Blobstore, es events.Stream) (
|
||||
Trashbin: &DecomposedfsTrashbin{},
|
||||
}
|
||||
|
||||
return New(o, aspects)
|
||||
return New(o, aspects, log)
|
||||
}
|
||||
|
||||
// New returns an implementation of the storage.FS interface that talks to
|
||||
// a local filesystem.
|
||||
func New(o *options.Options, aspects aspects.Aspects) (storage.FS, error) {
|
||||
log := logger.New()
|
||||
|
||||
func New(o *options.Options, aspects aspects.Aspects, log *zerolog.Logger) (storage.FS, error) {
|
||||
err := aspects.Tree.Setup()
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("could not setup tree")
|
||||
@@ -235,6 +236,7 @@ func New(o *options.Options, aspects aspects.Aspects) (storage.FS, error) {
|
||||
userSpaceIndex: userSpaceIndex,
|
||||
groupSpaceIndex: groupSpaceIndex,
|
||||
spaceTypeIndex: spaceTypeIndex,
|
||||
log: log,
|
||||
}
|
||||
fs.sessionStore = upload.NewSessionStore(fs, aspects, o.Root, o.AsyncFileUploads, o.Tokens)
|
||||
if err = fs.trashbin.Setup(fs); err != nil {
|
||||
@@ -311,7 +313,7 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) {
|
||||
keepUpload = true
|
||||
metrics.UploadSessionsAborted.Inc()
|
||||
case events.PPOutcomeContinue:
|
||||
if err := session.Finalize(); err != nil {
|
||||
if err := session.Finalize(ctx); err != nil {
|
||||
sublog.Error().Err(err).Msg("could not finalize upload")
|
||||
failed = true
|
||||
revertNodeMetadata = false
|
||||
|
||||
14
vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/grants.go
generated
vendored
14
vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/grants.go
generated
vendored
@@ -38,6 +38,8 @@ import (
|
||||
|
||||
// DenyGrant denies access to a resource.
|
||||
func (fs *Decomposedfs) DenyGrant(ctx context.Context, ref *provider.Reference, grantee *provider.Grantee) error {
|
||||
_, span := tracer.Start(ctx, "DenyGrant")
|
||||
defer span.End()
|
||||
log := appctx.GetLogger(ctx)
|
||||
|
||||
log.Debug().Interface("ref", ref).Interface("grantee", grantee).Msg("DenyGrant()")
|
||||
@@ -74,6 +76,8 @@ func (fs *Decomposedfs) DenyGrant(ctx context.Context, ref *provider.Reference,
|
||||
|
||||
// AddGrant adds a grant to a resource
|
||||
func (fs *Decomposedfs) AddGrant(ctx context.Context, ref *provider.Reference, g *provider.Grant) (err error) {
|
||||
_, span := tracer.Start(ctx, "AddGrant")
|
||||
defer span.End()
|
||||
log := appctx.GetLogger(ctx)
|
||||
log.Debug().Interface("ref", ref).Interface("grant", g).Msg("AddGrant()")
|
||||
grantNode, unlockFunc, grant, err := fs.loadGrant(ctx, ref, g)
|
||||
@@ -119,6 +123,8 @@ func (fs *Decomposedfs) AddGrant(ctx context.Context, ref *provider.Reference, g
|
||||
|
||||
// ListGrants lists the grants on the specified resource
|
||||
func (fs *Decomposedfs) ListGrants(ctx context.Context, ref *provider.Reference) (grants []*provider.Grant, err error) {
|
||||
_, span := tracer.Start(ctx, "ListGrants")
|
||||
defer span.End()
|
||||
var grantNode *node.Node
|
||||
if grantNode, err = fs.lu.NodeFromResource(ctx, ref); err != nil {
|
||||
return
|
||||
@@ -174,6 +180,8 @@ func (fs *Decomposedfs) ListGrants(ctx context.Context, ref *provider.Reference)
|
||||
|
||||
// RemoveGrant removes a grant from resource
|
||||
func (fs *Decomposedfs) RemoveGrant(ctx context.Context, ref *provider.Reference, g *provider.Grant) (err error) {
|
||||
_, span := tracer.Start(ctx, "RemoveGrant")
|
||||
defer span.End()
|
||||
grantNode, unlockFunc, grant, err := fs.loadGrant(ctx, ref, g)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -235,6 +243,8 @@ func isShareGrant(ctx context.Context) bool {
|
||||
// UpdateGrant updates a grant on a resource
|
||||
// TODO remove AddGrant or UpdateGrant grant from CS3 api, redundant? tracked in https://github.com/cs3org/cs3apis/issues/92
|
||||
func (fs *Decomposedfs) UpdateGrant(ctx context.Context, ref *provider.Reference, g *provider.Grant) error {
|
||||
_, span := tracer.Start(ctx, "UpdateGrant")
|
||||
defer span.End()
|
||||
log := appctx.GetLogger(ctx)
|
||||
log.Debug().Interface("ref", ref).Interface("grant", g).Msg("UpdateGrant()")
|
||||
|
||||
@@ -272,6 +282,8 @@ func (fs *Decomposedfs) UpdateGrant(ctx context.Context, ref *provider.Reference
|
||||
|
||||
// checks if the given grant exists and returns it. Nil grant means it doesn't exist
|
||||
func (fs *Decomposedfs) loadGrant(ctx context.Context, ref *provider.Reference, g *provider.Grant) (*node.Node, metadata.UnlockFunc, *provider.Grant, error) {
|
||||
_, span := tracer.Start(ctx, "loadGrant")
|
||||
defer span.End()
|
||||
n, err := fs.lu.NodeFromResource(ctx, ref)
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
@@ -308,6 +320,8 @@ func (fs *Decomposedfs) loadGrant(ctx context.Context, ref *provider.Reference,
|
||||
}
|
||||
|
||||
func (fs *Decomposedfs) storeGrant(ctx context.Context, n *node.Node, g *provider.Grant) error {
|
||||
_, span := tracer.Start(ctx, "storeGrant")
|
||||
defer span.End()
|
||||
// if is a grant to a space root, the receiver needs the space type to update the indexes
|
||||
spaceType, ok := storageprovider.SpaceTypeFromContext(ctx)
|
||||
if !ok {
|
||||
|
||||
4
vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata.go
generated
vendored
4
vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata.go
generated
vendored
@@ -38,6 +38,8 @@ import (
|
||||
|
||||
// SetArbitraryMetadata sets the metadata on a resource
|
||||
func (fs *Decomposedfs) SetArbitraryMetadata(ctx context.Context, ref *provider.Reference, md *provider.ArbitraryMetadata) (err error) {
|
||||
_, span := tracer.Start(ctx, "SetArbitraryMetadata")
|
||||
defer span.End()
|
||||
n, err := fs.lu.NodeFromResource(ctx, ref)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "Decomposedfs: error resolving ref")
|
||||
@@ -131,6 +133,8 @@ func (fs *Decomposedfs) SetArbitraryMetadata(ctx context.Context, ref *provider.
|
||||
|
||||
// UnsetArbitraryMetadata unsets the metadata on the given resource
|
||||
func (fs *Decomposedfs) UnsetArbitraryMetadata(ctx context.Context, ref *provider.Reference, keys []string) (err error) {
|
||||
_, span := tracer.Start(ctx, "UnsetArbitraryMetadata")
|
||||
defer span.End()
|
||||
n, err := fs.lu.NodeFromResource(ctx, ref)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "Decomposedfs: error resolving ref")
|
||||
|
||||
@@ -34,3 +34,12 @@ func (m *Map[K, V]) Range(f func(key K, value V) bool) {
|
||||
}
|
||||
|
||||
func (m *Map[K, V]) Store(key K, value V) { m.m.Store(key, value) }
|
||||
|
||||
func (m *Map[K, V]) Count() int {
|
||||
l := 0
|
||||
m.Range(func(_ K, _ V) bool {
|
||||
l++
|
||||
return true
|
||||
})
|
||||
return l
|
||||
}
|
||||
|
||||
26
vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node/locks.go
generated
vendored
26
vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node/locks.go
generated
vendored
@@ -38,6 +38,8 @@ import (
|
||||
|
||||
// SetLock sets a lock on the node
|
||||
func (n *Node) SetLock(ctx context.Context, lock *provider.Lock) error {
|
||||
ctx, span := tracer.Start(ctx, "SetLock")
|
||||
defer span.End()
|
||||
lockFilePath := n.LockFilePath()
|
||||
|
||||
// ensure parent path exists
|
||||
@@ -89,22 +91,31 @@ func (n *Node) SetLock(ctx context.Context, lock *provider.Lock) error {
|
||||
|
||||
// ReadLock reads the lock id for a node
|
||||
func (n Node) ReadLock(ctx context.Context, skipFileLock bool) (*provider.Lock, error) {
|
||||
ctx, span := tracer.Start(ctx, "ReadLock")
|
||||
defer span.End()
|
||||
|
||||
// ensure parent path exists
|
||||
if err := os.MkdirAll(filepath.Dir(n.InternalPath()), 0700); err != nil {
|
||||
_, subspan := tracer.Start(ctx, "os.MkdirAll")
|
||||
err := os.MkdirAll(filepath.Dir(n.InternalPath()), 0700)
|
||||
subspan.End()
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "Decomposedfs: error creating parent folder for lock")
|
||||
}
|
||||
|
||||
// the caller of ReadLock already may hold a file lock
|
||||
if !skipFileLock {
|
||||
_, subspan := tracer.Start(ctx, "filelocks.AcquireReadLock")
|
||||
fileLock, err := filelocks.AcquireReadLock(n.InternalPath())
|
||||
subspan.End()
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
defer func() {
|
||||
_, subspan := tracer.Start(ctx, "filelocks.ReleaseLock")
|
||||
rerr := filelocks.ReleaseLock(fileLock)
|
||||
subspan.End()
|
||||
|
||||
// if err is non nil we do not overwrite that
|
||||
if err == nil {
|
||||
@@ -113,7 +124,10 @@ func (n Node) ReadLock(ctx context.Context, skipFileLock bool) (*provider.Lock,
|
||||
}()
|
||||
}
|
||||
|
||||
_, subspan = tracer.Start(ctx, "os.Open")
|
||||
f, err := os.Open(n.LockFilePath())
|
||||
subspan.End()
|
||||
|
||||
if err != nil {
|
||||
if errors.Is(err, fs.ErrNotExist) {
|
||||
return nil, errtypes.NotFound("no lock found")
|
||||
@@ -130,7 +144,11 @@ func (n Node) ReadLock(ctx context.Context, skipFileLock bool) (*provider.Lock,
|
||||
|
||||
// lock already expired
|
||||
if lock.Expiration != nil && time.Now().After(time.Unix(int64(lock.Expiration.Seconds), int64(lock.Expiration.Nanos))) {
|
||||
if err = os.Remove(f.Name()); err != nil {
|
||||
|
||||
_, subspan = tracer.Start(ctx, "os.Remove")
|
||||
err = os.Remove(f.Name())
|
||||
subspan.End()
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "Decomposedfs: could not remove expired lock file")
|
||||
}
|
||||
// we successfully deleted the expired lock
|
||||
@@ -142,6 +160,8 @@ func (n Node) ReadLock(ctx context.Context, skipFileLock bool) (*provider.Lock,
|
||||
|
||||
// RefreshLock refreshes the node's lock
|
||||
func (n *Node) RefreshLock(ctx context.Context, lock *provider.Lock, existingLockID string) error {
|
||||
ctx, span := tracer.Start(ctx, "RefreshLock")
|
||||
defer span.End()
|
||||
|
||||
// ensure parent path exists
|
||||
if err := os.MkdirAll(filepath.Dir(n.InternalPath()), 0700); err != nil {
|
||||
@@ -204,6 +224,8 @@ func (n *Node) RefreshLock(ctx context.Context, lock *provider.Lock, existingLoc
|
||||
|
||||
// Unlock unlocks the node
|
||||
func (n *Node) Unlock(ctx context.Context, lock *provider.Lock) error {
|
||||
ctx, span := tracer.Start(ctx, "Unlock")
|
||||
defer span.End()
|
||||
|
||||
// ensure parent path exists
|
||||
if err := os.MkdirAll(filepath.Dir(n.InternalPath()), 0700); err != nil {
|
||||
|
||||
7
vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node/node.go
generated
vendored
7
vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node/node.go
generated
vendored
@@ -218,6 +218,8 @@ func (n *Node) MarshalJSON() ([]byte, error) {
|
||||
|
||||
// Type returns the node's resource type
|
||||
func (n *Node) Type(ctx context.Context) provider.ResourceType {
|
||||
_, span := tracer.Start(ctx, "Type")
|
||||
defer span.End()
|
||||
if n.nodeType != nil {
|
||||
return *n.nodeType
|
||||
}
|
||||
@@ -446,6 +448,8 @@ func (n *Node) Child(ctx context.Context, name string) (*Node, error) {
|
||||
|
||||
// ParentWithReader returns the parent node
|
||||
func (n *Node) ParentWithReader(ctx context.Context, r io.Reader) (*Node, error) {
|
||||
_, span := tracer.Start(ctx, "ParentWithReader")
|
||||
defer span.End()
|
||||
if n.ParentID == "" {
|
||||
return nil, fmt.Errorf("decomposedfs: root has no parent")
|
||||
}
|
||||
@@ -1261,8 +1265,7 @@ func (n *Node) ProcessingID(ctx context.Context) (string, error) {
|
||||
|
||||
// IsSpaceRoot checks if the node is a space root
|
||||
func (n *Node) IsSpaceRoot(ctx context.Context) bool {
|
||||
_, err := n.Xattr(ctx, prefixes.SpaceNameAttr)
|
||||
return err == nil
|
||||
return n.ID == n.SpaceID
|
||||
}
|
||||
|
||||
// SetScanData sets the virus scan info to the node
|
||||
|
||||
2
vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node/xattrs.go
generated
vendored
2
vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node/xattrs.go
generated
vendored
@@ -68,6 +68,8 @@ func (md Attributes) Time(key string) (time.Time, error) {
|
||||
|
||||
// SetXattrs sets multiple extended attributes on the write-through cache/node
|
||||
func (n *Node) SetXattrsWithContext(ctx context.Context, attribs map[string][]byte, acquireLock bool) (err error) {
|
||||
_, span := tracer.Start(ctx, "SetXattrsWithContext")
|
||||
defer span.End()
|
||||
if n.xattrsCache != nil {
|
||||
for k, v := range attribs {
|
||||
n.xattrsCache[k] = v
|
||||
|
||||
@@ -60,6 +60,8 @@ func (p Permissions) AssemblePermissions(ctx context.Context, n *node.Node) (*pr
|
||||
|
||||
// AssembleTrashPermissions is used to assemble file permissions
|
||||
func (p Permissions) AssembleTrashPermissions(ctx context.Context, n *node.Node) (*provider.ResourcePermissions, error) {
|
||||
_, span := tracer.Start(ctx, "AssembleTrashPermissions")
|
||||
defer span.End()
|
||||
return p.item.AssembleTrashPermissions(ctx, n)
|
||||
}
|
||||
|
||||
|
||||
9
vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/recycle.go
generated
vendored
9
vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/recycle.go
generated
vendored
@@ -64,7 +64,8 @@ func (tb *DecomposedfsTrashbin) Setup(fs storage.FS) error {
|
||||
// ListRecycle returns the list of available recycle items
|
||||
// ref -> the space (= resourceid), key -> deleted node id, relativePath = relative to key
|
||||
func (tb *DecomposedfsTrashbin) ListRecycle(ctx context.Context, ref *provider.Reference, key, relativePath string) ([]*provider.RecycleItem, error) {
|
||||
|
||||
_, span := tracer.Start(ctx, "ListRecycle")
|
||||
defer span.End()
|
||||
if ref == nil || ref.ResourceId == nil || ref.ResourceId.OpaqueId == "" {
|
||||
return nil, errtypes.BadRequest("spaceid required")
|
||||
}
|
||||
@@ -346,6 +347,8 @@ func (tb *DecomposedfsTrashbin) listTrashRoot(ctx context.Context, spaceID strin
|
||||
|
||||
// RestoreRecycleItem restores the specified item
|
||||
func (tb *DecomposedfsTrashbin) RestoreRecycleItem(ctx context.Context, ref *provider.Reference, key, relativePath string, restoreRef *provider.Reference) error {
|
||||
_, span := tracer.Start(ctx, "RestoreRecycleItem")
|
||||
defer span.End()
|
||||
if ref == nil {
|
||||
return errtypes.BadRequest("missing reference, needs a space id")
|
||||
}
|
||||
@@ -399,6 +402,8 @@ func (tb *DecomposedfsTrashbin) RestoreRecycleItem(ctx context.Context, ref *pro
|
||||
|
||||
// PurgeRecycleItem purges the specified item, all its children and all their revisions
|
||||
func (tb *DecomposedfsTrashbin) PurgeRecycleItem(ctx context.Context, ref *provider.Reference, key, relativePath string) error {
|
||||
_, span := tracer.Start(ctx, "PurgeRecycleItem")
|
||||
defer span.End()
|
||||
if ref == nil {
|
||||
return errtypes.BadRequest("missing reference, needs a space id")
|
||||
}
|
||||
@@ -429,6 +434,8 @@ func (tb *DecomposedfsTrashbin) PurgeRecycleItem(ctx context.Context, ref *provi
|
||||
|
||||
// EmptyRecycle empties the trash
|
||||
func (tb *DecomposedfsTrashbin) EmptyRecycle(ctx context.Context, ref *provider.Reference) error {
|
||||
_, span := tracer.Start(ctx, "EmptyRecycle")
|
||||
defer span.End()
|
||||
if ref == nil || ref.ResourceId == nil || ref.ResourceId.OpaqueId == "" {
|
||||
return errtypes.BadRequest("spaceid must be set")
|
||||
}
|
||||
|
||||
10
vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/revisions.go
generated
vendored
10
vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/revisions.go
generated
vendored
@@ -48,6 +48,8 @@ import (
|
||||
|
||||
// ListRevisions lists the revisions of the given resource
|
||||
func (fs *Decomposedfs) ListRevisions(ctx context.Context, ref *provider.Reference) (revisions []*provider.FileVersion, err error) {
|
||||
_, span := tracer.Start(ctx, "ListRevisions")
|
||||
defer span.End()
|
||||
var n *node.Node
|
||||
if n, err = fs.lu.NodeFromResource(ctx, ref); err != nil {
|
||||
return
|
||||
@@ -115,6 +117,8 @@ func (fs *Decomposedfs) ListRevisions(ctx context.Context, ref *provider.Referen
|
||||
// DownloadRevision returns a reader for the specified revision
|
||||
// FIXME the CS3 api should explicitly allow initiating revision and trash download, a related issue is https://github.com/cs3org/reva/issues/1813
|
||||
func (fs *Decomposedfs) DownloadRevision(ctx context.Context, ref *provider.Reference, revisionKey string, openReaderFunc func(md *provider.ResourceInfo) bool) (*provider.ResourceInfo, io.ReadCloser, error) {
|
||||
_, span := tracer.Start(ctx, "DownloadRevision")
|
||||
defer span.End()
|
||||
log := appctx.GetLogger(ctx)
|
||||
|
||||
// verify revision key format
|
||||
@@ -186,6 +190,8 @@ func (fs *Decomposedfs) DownloadRevision(ctx context.Context, ref *provider.Refe
|
||||
|
||||
// RestoreRevision restores the specified revision of the resource
|
||||
func (fs *Decomposedfs) RestoreRevision(ctx context.Context, ref *provider.Reference, revisionKey string) (returnErr error) {
|
||||
_, span := tracer.Start(ctx, "RestoreRevision")
|
||||
defer span.End()
|
||||
log := appctx.GetLogger(ctx)
|
||||
|
||||
// verify revision key format
|
||||
@@ -330,6 +336,8 @@ func (fs *Decomposedfs) RestoreRevision(ctx context.Context, ref *provider.Refer
|
||||
|
||||
// DeleteRevision deletes the specified revision of the resource
|
||||
func (fs *Decomposedfs) DeleteRevision(ctx context.Context, ref *provider.Reference, revisionKey string) error {
|
||||
_, span := tracer.Start(ctx, "DeleteRevision")
|
||||
defer span.End()
|
||||
n, err := fs.getRevisionNode(ctx, ref, revisionKey, func(rp *provider.ResourcePermissions) bool {
|
||||
return rp.RestoreFileVersion
|
||||
})
|
||||
@@ -345,6 +353,8 @@ func (fs *Decomposedfs) DeleteRevision(ctx context.Context, ref *provider.Refere
|
||||
}
|
||||
|
||||
func (fs *Decomposedfs) getRevisionNode(ctx context.Context, ref *provider.Reference, revisionKey string, hasPermission func(*provider.ResourcePermissions) bool) (*node.Node, error) {
|
||||
_, span := tracer.Start(ctx, "getRevisionNode")
|
||||
defer span.End()
|
||||
log := appctx.GetLogger(ctx)
|
||||
|
||||
// verify revision key format
|
||||
|
||||
@@ -183,7 +183,7 @@ func (p AsyncPropagator) queuePropagation(ctx context.Context, spaceID, nodeID s
|
||||
ready = true
|
||||
break
|
||||
}
|
||||
log.Error().Err(err).Msg("failed to write Change to disk (retrying)")
|
||||
log.Debug().Err(err).Msg("failed to write Change to disk (retrying)")
|
||||
err = os.Mkdir(filepath.Dir(changePath), 0700)
|
||||
triggerPropagation = err == nil || os.IsExist(err) // only the first goroutine, which succeeds to create the directory, is supposed to actually trigger the propagation
|
||||
}
|
||||
@@ -386,7 +386,7 @@ func (p AsyncPropagator) propagate(ctx context.Context, spaceID, nodeID string,
|
||||
// a negative new treesize. Something must have gone wrong with the accounting.
|
||||
// Reset the current treesize to 0.
|
||||
log.Error().Uint64("treeSize", treeSize).Int64("sizeDiff", pc.SizeDiff).
|
||||
Msg("Error when updating treesize of node. Updated treesize < 0. Reestting to 0")
|
||||
Msg("Error when updating treesize of node. Updated treesize < 0. Resetting to 0")
|
||||
newSize = 0
|
||||
default:
|
||||
newSize = treeSize - uint64(-pc.SizeDiff)
|
||||
@@ -414,7 +414,7 @@ func (p AsyncPropagator) propagate(ctx context.Context, spaceID, nodeID string,
|
||||
log.Info().Msg("Propagation done. cleaning up")
|
||||
cleanup()
|
||||
|
||||
if !n.IsSpaceRoot(ctx) { // This does not seem robust as it checks the space name property
|
||||
if !n.IsSpaceRoot(ctx) {
|
||||
p.queuePropagation(ctx, n.SpaceID, n.ParentID, pc, log)
|
||||
}
|
||||
|
||||
|
||||
36
vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/tree/tree.go
generated
vendored
36
vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/tree/tree.go
generated
vendored
@@ -106,6 +106,8 @@ func (t *Tree) Setup() error {
|
||||
|
||||
// GetMD returns the metadata of a node in the tree
|
||||
func (t *Tree) GetMD(ctx context.Context, n *node.Node) (os.FileInfo, error) {
|
||||
_, span := tracer.Start(ctx, "GetMD")
|
||||
defer span.End()
|
||||
md, err := os.Stat(n.InternalPath())
|
||||
if err != nil {
|
||||
if errors.Is(err, fs.ErrNotExist) {
|
||||
@@ -119,6 +121,8 @@ func (t *Tree) GetMD(ctx context.Context, n *node.Node) (os.FileInfo, error) {
|
||||
|
||||
// TouchFile creates a new empty file
|
||||
func (t *Tree) TouchFile(ctx context.Context, n *node.Node, markprocessing bool, mtime string) error {
|
||||
_, span := tracer.Start(ctx, "TouchFile")
|
||||
defer span.End()
|
||||
if n.Exists {
|
||||
if markprocessing {
|
||||
return n.SetXattr(ctx, prefixes.StatusPrefix, []byte(node.ProcessingStatus))
|
||||
@@ -223,6 +227,8 @@ func (t *Tree) CreateDir(ctx context.Context, n *node.Node) (err error) {
|
||||
|
||||
// Move replaces the target with the source
|
||||
func (t *Tree) Move(ctx context.Context, oldNode *node.Node, newNode *node.Node) (err error) {
|
||||
_, span := tracer.Start(ctx, "Move")
|
||||
defer span.End()
|
||||
if oldNode.SpaceID != newNode.SpaceID {
|
||||
// WebDAV RFC https://www.rfc-editor.org/rfc/rfc4918#section-9.9.4 says to use
|
||||
// > 502 (Bad Gateway) - This may occur when the destination is on another
|
||||
@@ -432,6 +438,8 @@ func (t *Tree) ListFolder(ctx context.Context, n *node.Node) ([]*node.Node, erro
|
||||
|
||||
// Delete deletes a node in the tree by moving it to the trash
|
||||
func (t *Tree) Delete(ctx context.Context, n *node.Node) (err error) {
|
||||
_, span := tracer.Start(ctx, "Delete")
|
||||
defer span.End()
|
||||
path := filepath.Join(n.ParentPath(), n.Name)
|
||||
// remove entry from cache immediately to avoid inconsistencies
|
||||
defer func() { _ = t.idCache.Delete(path) }()
|
||||
@@ -524,6 +532,8 @@ func (t *Tree) Delete(ctx context.Context, n *node.Node) (err error) {
|
||||
|
||||
// RestoreRecycleItemFunc returns a node and a function to restore it from the trash.
|
||||
func (t *Tree) RestoreRecycleItemFunc(ctx context.Context, spaceid, key, trashPath string, targetNode *node.Node) (*node.Node, *node.Node, func() error, error) {
|
||||
_, span := tracer.Start(ctx, "RestoreRecycleItemFunc")
|
||||
defer span.End()
|
||||
logger := appctx.GetLogger(ctx)
|
||||
|
||||
recycleNode, trashItem, deletedNodePath, origin, err := t.readRecycleItem(ctx, spaceid, key, trashPath)
|
||||
@@ -623,6 +633,8 @@ func (t *Tree) RestoreRecycleItemFunc(ctx context.Context, spaceid, key, trashPa
|
||||
|
||||
// PurgeRecycleItemFunc returns a node and a function to purge it from the trash
|
||||
func (t *Tree) PurgeRecycleItemFunc(ctx context.Context, spaceid, key string, path string) (*node.Node, func() error, error) {
|
||||
_, span := tracer.Start(ctx, "PurgeRecycleItemFunc")
|
||||
defer span.End()
|
||||
logger := appctx.GetLogger(ctx)
|
||||
|
||||
rn, trashItem, deletedNodePath, _, err := t.readRecycleItem(ctx, spaceid, key, path)
|
||||
@@ -664,25 +676,38 @@ func (t *Tree) PurgeRecycleItemFunc(ctx context.Context, spaceid, key string, pa
|
||||
|
||||
// InitNewNode initializes a new node
|
||||
func (t *Tree) InitNewNode(ctx context.Context, n *node.Node, fsize uint64) (metadata.UnlockFunc, error) {
|
||||
_, span := tracer.Start(ctx, "InitNewNode")
|
||||
defer span.End()
|
||||
// create folder structure (if needed)
|
||||
if err := os.MkdirAll(filepath.Dir(n.InternalPath()), 0700); err != nil {
|
||||
|
||||
_, subspan := tracer.Start(ctx, "os.MkdirAll")
|
||||
err := os.MkdirAll(filepath.Dir(n.InternalPath()), 0700)
|
||||
subspan.End()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// create and write lock new node metadata
|
||||
_, subspan = tracer.Start(ctx, "metadata.Lock")
|
||||
unlock, err := t.lookup.MetadataBackend().Lock(n.InternalPath())
|
||||
subspan.End()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// we also need to touch the actual node file here it stores the mtime of the resource
|
||||
_, subspan = tracer.Start(ctx, "os.OpenFile")
|
||||
h, err := os.OpenFile(n.InternalPath(), os.O_CREATE|os.O_EXCL, 0600)
|
||||
subspan.End()
|
||||
if err != nil {
|
||||
return unlock, err
|
||||
}
|
||||
h.Close()
|
||||
|
||||
if _, err := node.CheckQuota(ctx, n.SpaceRoot, false, 0, fsize); err != nil {
|
||||
_, subspan = tracer.Start(ctx, "node.CheckQuota")
|
||||
_, err = node.CheckQuota(ctx, n.SpaceRoot, false, 0, fsize)
|
||||
subspan.End()
|
||||
if err != nil {
|
||||
return unlock, err
|
||||
}
|
||||
|
||||
@@ -692,7 +717,10 @@ func (t *Tree) InitNewNode(ctx context.Context, n *node.Node, fsize uint64) (met
|
||||
log := appctx.GetLogger(ctx).With().Str("childNameLink", childNameLink).Str("relativeNodePath", relativeNodePath).Logger()
|
||||
log.Info().Msg("initNewNode: creating symlink")
|
||||
|
||||
if err = os.Symlink(relativeNodePath, childNameLink); err != nil {
|
||||
_, subspan = tracer.Start(ctx, "os.Symlink")
|
||||
err = os.Symlink(relativeNodePath, childNameLink)
|
||||
subspan.End()
|
||||
if err != nil {
|
||||
log.Info().Err(err).Msg("initNewNode: symlink failed")
|
||||
if errors.Is(err, fs.ErrExist) {
|
||||
log.Info().Err(err).Msg("initNewNode: symlink already exists")
|
||||
@@ -854,6 +882,8 @@ var nodeIDRegep = regexp.MustCompile(`.*/nodes/([^.]*).*`)
|
||||
|
||||
// TODO refactor the returned params into Node properties? would make all the path transformations go away...
|
||||
func (t *Tree) readRecycleItem(ctx context.Context, spaceID, key, path string) (recycleNode *node.Node, trashItem string, deletedNodePath string, origin string, err error) {
|
||||
_, span := tracer.Start(ctx, "readRecycleItem")
|
||||
defer span.End()
|
||||
logger := appctx.GetLogger(ctx)
|
||||
|
||||
if key == "" {
|
||||
|
||||
4
vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/upload.go
generated
vendored
4
vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/upload.go
generated
vendored
@@ -47,6 +47,8 @@ import (
|
||||
// TODO Upload (and InitiateUpload) needs a way to receive the expected checksum.
|
||||
// Maybe in metadata as 'checksum' => 'sha1 aeosvp45w5xaeoe' = lowercase, space separated?
|
||||
func (fs *Decomposedfs) Upload(ctx context.Context, req storage.UploadRequest, uff storage.UploadFinishedFunc) (*provider.ResourceInfo, error) {
|
||||
_, span := tracer.Start(ctx, "Upload")
|
||||
defer span.End()
|
||||
up, err := fs.GetUpload(ctx, req.Ref.GetPath())
|
||||
if err != nil {
|
||||
return &provider.ResourceInfo{}, errors.Wrap(err, "Decomposedfs: error retrieving upload")
|
||||
@@ -130,6 +132,8 @@ func (fs *Decomposedfs) Upload(ctx context.Context, req storage.UploadRequest, u
|
||||
// TODO read optional content for small files in this request
|
||||
// TODO InitiateUpload (and Upload) needs a way to receive the expected checksum. Maybe in metadata as 'checksum' => 'sha1 aeosvp45w5xaeoe' = lowercase, space separated?
|
||||
func (fs *Decomposedfs) InitiateUpload(ctx context.Context, ref *provider.Reference, uploadLength int64, metadata map[string]string) (map[string]string, error) {
|
||||
_, span := tracer.Start(ctx, "InitiateUpload")
|
||||
defer span.End()
|
||||
log := appctx.GetLogger(ctx)
|
||||
|
||||
// remember the path from the reference
|
||||
|
||||
4
vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/upload/session.go
generated
vendored
4
vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/upload/session.go
generated
vendored
@@ -81,6 +81,8 @@ func (s *OcisSession) executantUser() *userpb.User {
|
||||
|
||||
// Purge deletes the upload session metadata and written binary data
|
||||
func (s *OcisSession) Purge(ctx context.Context) error {
|
||||
_, span := tracer.Start(ctx, "Purge")
|
||||
defer span.End()
|
||||
sessionPath := sessionPath(s.store.root, s.info.ID)
|
||||
f, err := lockedfile.OpenFile(sessionPath+".lock", os.O_RDWR|os.O_TRUNC|os.O_CREATE, 0600)
|
||||
if err != nil {
|
||||
@@ -112,6 +114,8 @@ func (s *OcisSession) TouchBin() error {
|
||||
// events can update the scan outcome and the finished event might read an empty file because of race conditions
|
||||
// so we need to lock the file while writing and use atomic writes
|
||||
func (s *OcisSession) Persist(ctx context.Context) error {
|
||||
_, span := tracer.Start(ctx, "Persist")
|
||||
defer span.End()
|
||||
sessionPath := sessionPath(s.store.root, s.info.ID)
|
||||
// create folder structure (if needed)
|
||||
if err := os.MkdirAll(filepath.Dir(sessionPath), 0700); err != nil {
|
||||
|
||||
8
vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/upload/store.go
generated
vendored
8
vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/upload/store.go
generated
vendored
@@ -199,8 +199,8 @@ func (store OcisStore) Cleanup(ctx context.Context, session Session, revertNodeM
|
||||
// CreateNodeForUpload will create the target node for the Upload
|
||||
// TODO move this to the node package as NodeFromUpload?
|
||||
// should we in InitiateUpload create the node first? and then the upload?
|
||||
func (store OcisStore) CreateNodeForUpload(session *OcisSession, initAttrs node.Attributes) (*node.Node, error) {
|
||||
ctx, span := tracer.Start(session.Context(context.Background()), "CreateNodeForUpload")
|
||||
func (store OcisStore) CreateNodeForUpload(ctx context.Context, session *OcisSession, initAttrs node.Attributes) (*node.Node, error) {
|
||||
ctx, span := tracer.Start(session.Context(ctx), "CreateNodeForUpload")
|
||||
defer span.End()
|
||||
n := node.New(
|
||||
session.SpaceID(),
|
||||
@@ -303,6 +303,8 @@ func (store OcisStore) CreateNodeForUpload(session *OcisSession, initAttrs node.
|
||||
}
|
||||
|
||||
func (store OcisStore) updateExistingNode(ctx context.Context, session *OcisSession, n *node.Node, spaceID string, fsize uint64) (metadata.UnlockFunc, error) {
|
||||
_, span := tracer.Start(ctx, "updateExistingNode")
|
||||
defer span.End()
|
||||
targetPath := n.InternalPath()
|
||||
|
||||
// write lock existing node before reading any metadata
|
||||
@@ -388,6 +390,7 @@ func (store OcisStore) updateExistingNode(ctx context.Context, session *OcisSess
|
||||
}
|
||||
|
||||
// clean revision file
|
||||
span.AddEvent("os.Create")
|
||||
if _, err := os.Create(versionPath); err != nil {
|
||||
return unlock, err
|
||||
}
|
||||
@@ -405,6 +408,7 @@ func (store OcisStore) updateExistingNode(ctx context.Context, session *OcisSess
|
||||
}
|
||||
session.info.MetaData["versionsPath"] = versionPath
|
||||
// keep mtime from previous version
|
||||
span.AddEvent("os.Chtimes")
|
||||
if err := os.Chtimes(session.info.MetaData["versionsPath"], oldNodeMtime, oldNodeMtime); err != nil {
|
||||
return unlock, errtypes.InternalError(fmt.Sprintf("failed to change mtime of version node: %s", err))
|
||||
}
|
||||
|
||||
8
vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/upload/upload.go
generated
vendored
8
vendor/github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/upload/upload.go
generated
vendored
@@ -184,7 +184,7 @@ func (session *OcisSession) FinishUploadDecomposed(ctx context.Context) error {
|
||||
}
|
||||
}
|
||||
|
||||
n, err := session.store.CreateNodeForUpload(session, attrs)
|
||||
n, err := session.store.CreateNodeForUpload(ctx, session, attrs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -226,7 +226,7 @@ func (session *OcisSession) FinishUploadDecomposed(ctx context.Context) error {
|
||||
// for 0-byte uploads we take a shortcut and finalize isn't called elsewhere
|
||||
if !session.store.async || session.info.Size == 0 {
|
||||
// handle postprocessing synchronously
|
||||
err = session.Finalize()
|
||||
err = session.Finalize(ctx)
|
||||
session.store.Cleanup(ctx, session, err != nil, false, err == nil)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("failed to upload")
|
||||
@@ -279,8 +279,8 @@ func (session *OcisSession) ConcatUploads(_ context.Context, uploads []tusd.Uplo
|
||||
}
|
||||
|
||||
// Finalize finalizes the upload (eg moves the file to the internal destination)
|
||||
func (session *OcisSession) Finalize() (err error) {
|
||||
ctx, span := tracer.Start(session.Context(context.Background()), "Finalize")
|
||||
func (session *OcisSession) Finalize(ctx context.Context) (err error) {
|
||||
ctx, span := tracer.Start(session.Context(ctx), "Finalize")
|
||||
defer span.End()
|
||||
|
||||
revisionNode := node.New(session.SpaceID(), session.NodeID(), "", "", session.Size(), session.ID(),
|
||||
|
||||
6
vendor/github.com/cs3org/reva/v2/pkg/storage/utils/metadata/cs3.go
generated
vendored
6
vendor/github.com/cs3org/reva/v2/pkg/storage/utils/metadata/cs3.go
generated
vendored
@@ -552,7 +552,11 @@ func (cs3 *CS3) getAuthContext(ctx context.Context) (context.Context, error) {
|
||||
authCtx, span := tracer.Start(authCtx, "getAuthContext", trace.WithLinks(trace.LinkFromContext(ctx)))
|
||||
defer span.End()
|
||||
|
||||
client, err := pool.GetGatewayServiceClient(cs3.gatewayAddr)
|
||||
selector, err := pool.GatewaySelector(cs3.gatewayAddr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
client, err := selector.Next()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
2
vendor/modules.txt
vendored
2
vendor/modules.txt
vendored
@@ -367,7 +367,7 @@ github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1
|
||||
github.com/cs3org/go-cs3apis/cs3/storage/registry/v1beta1
|
||||
github.com/cs3org/go-cs3apis/cs3/tx/v1beta1
|
||||
github.com/cs3org/go-cs3apis/cs3/types/v1beta1
|
||||
# github.com/cs3org/reva/v2 v2.26.7
|
||||
# github.com/cs3org/reva/v2 v2.26.8-0.20241203081301-17f339546533
|
||||
## explicit; go 1.22.0
|
||||
github.com/cs3org/reva/v2/cmd/revad/internal/grace
|
||||
github.com/cs3org/reva/v2/cmd/revad/runtime
|
||||
|
||||
Reference in New Issue
Block a user