[full-ci] bump reva to v2.26.7

This commit is contained in:
Roman Perekhod
2024-11-20 13:55:31 +01:00
parent bd4cf280df
commit 2b07916f22
44 changed files with 292 additions and 1937 deletions

View File

@@ -197,7 +197,11 @@ func buildOpenInAppRequest(ctx context.Context, ri *storageprovider.ResourceInfo
}
// build a fake user object for the token
currentuser := ctxpkg.ContextMustGetUser(ctx)
currentuser, ok := ctxpkg.ContextGetUser(ctx)
if !ok {
return nil, errors.New("user not found in context")
}
scopedUser := &userpb.User{
Id: ri.GetOwner(), // the owner of the resource is always set, right?
DisplayName: "View Only user for " + currentuser.GetUsername(),

View File

@@ -48,7 +48,7 @@ import (
"github.com/cs3org/reva/v2/pkg/share"
"github.com/cs3org/reva/v2/pkg/storagespace"
"github.com/cs3org/reva/v2/pkg/utils"
"github.com/golang-jwt/jwt"
"github.com/golang-jwt/jwt/v5"
"github.com/pkg/errors"
gstatus "google.golang.org/grpc/status"
)
@@ -78,7 +78,7 @@ import (
// transferClaims are custom claims for a JWT token to be used between the metadata and data gateways.
type transferClaims struct {
jwt.StandardClaims
jwt.RegisteredClaims
Target string `json:"target"`
}
@@ -86,10 +86,10 @@ func (s *svc) sign(_ context.Context, target string, expiresAt int64) (string, e
// Tus sends a separate request to the datagateway service for every chunk.
// For large files, this can take a long time, so we extend the expiration
claims := transferClaims{
StandardClaims: jwt.StandardClaims{
ExpiresAt: expiresAt,
Audience: "reva",
IssuedAt: time.Now().Unix(),
RegisteredClaims: jwt.RegisteredClaims{
ExpiresAt: jwt.NewNumericDate(time.Unix(expiresAt, 0)),
Audience: jwt.ClaimStrings{"reva"},
IssuedAt: jwt.NewNumericDate(time.Now()),
},
Target: target,
}
@@ -428,7 +428,10 @@ func (s *svc) DeleteStorageSpace(ctx context.Context, req *provider.DeleteStorag
}
func (s *svc) GetHome(ctx context.Context, _ *provider.GetHomeRequest) (*provider.GetHomeResponse, error) {
currentUser := ctxpkg.ContextMustGetUser(ctx)
currentUser, ok := ctxpkg.ContextGetUser(ctx)
if !ok {
return nil, errors.New("user not found in context")
}
srClient, err := s.getStorageRegistryClient(ctx, s.c.StorageRegistryEndpoint)
if err != nil {

View File

@@ -28,6 +28,7 @@ import (
sdk "github.com/cs3org/reva/v2/pkg/sdk/common"
"github.com/cs3org/reva/v2/pkg/storage/cache"
"github.com/cs3org/reva/v2/pkg/utils"
"github.com/pkg/errors"
"google.golang.org/grpc"
)
@@ -44,7 +45,12 @@ func (c *cachedRegistryClient) ListStorageProviders(ctx context.Context, in *reg
spaceID := sdk.DecodeOpaqueMap(in.Opaque)["space_id"]
key := c.cache.GetKey(ctxpkg.ContextMustGetUser(ctx).GetId(), spaceID)
u, ok := ctxpkg.ContextGetUser(ctx)
if !ok {
return nil, errors.New("user not found in context")
}
key := c.cache.GetKey(u.GetId(), spaceID)
if key != "" {
s := &registry.ListStorageProvidersResponse{}
if err := c.cache.PullFromCache(key, s); err == nil {
@@ -89,7 +95,12 @@ type cachedSpacesAPIClient struct {
// CreateStorageSpace creates a storage space
func (c *cachedSpacesAPIClient) CreateStorageSpace(ctx context.Context, in *provider.CreateStorageSpaceRequest, opts ...grpc.CallOption) (*provider.CreateStorageSpaceResponse, error) {
if in.Type == "personal" {
key := c.createPersonalSpaceCache.GetKey(ctxpkg.ContextMustGetUser(ctx).GetId())
u, ok := ctxpkg.ContextGetUser(ctx)
if !ok {
return nil, errors.New("user not found in context")
}
key := c.createPersonalSpaceCache.GetKey(u.GetId())
if key != "" {
s := &provider.CreateStorageSpaceResponse{}
if err := c.createPersonalSpaceCache.PullFromCache(key, s); err == nil {
@@ -132,7 +143,12 @@ type cachedAPIClient struct {
// CreateHome caches calls to CreateHome locally - anyways they only need to be called once per user
func (c *cachedAPIClient) CreateHome(ctx context.Context, in *provider.CreateHomeRequest, opts ...grpc.CallOption) (*provider.CreateHomeResponse, error) {
key := c.createPersonalSpaceCache.GetKey(ctxpkg.ContextMustGetUser(ctx).GetId())
u, ok := ctxpkg.ContextGetUser(ctx)
if !ok {
return nil, errors.New("user not found in context")
}
key := c.createPersonalSpaceCache.GetKey(u.GetId())
if key != "" {
s := &provider.CreateHomeResponse{}
if err := c.createPersonalSpaceCache.PullFromCache(key, s); err == nil {

View File

@@ -139,7 +139,11 @@ func (s *svc) updateShare(ctx context.Context, req *collaboration.UpdateShareReq
}
if s.c.CommitShareToStorageGrant {
creator := ctxpkg.ContextMustGetUser(ctx)
creator, ok := ctxpkg.ContextGetUser(ctx)
if !ok {
return nil, errors.New("user not found in context")
}
grant := &provider.Grant{
Grantee: res.GetShare().GetGrantee(),
Permissions: res.GetShare().GetPermissions().GetPermissions(),
@@ -198,11 +202,16 @@ func (s *svc) updateSpaceShare(ctx context.Context, req *collaboration.UpdateSha
req.Share.Expiration = existsGrant.GetExpiration()
}
u, ok := ctxpkg.ContextGetUser(ctx)
if !ok {
return nil, errors.New("user not found in context")
}
grant := &provider.Grant{
Grantee: req.GetShare().GetGrantee(),
Permissions: req.GetShare().GetPermissions().GetPermissions(),
Expiration: req.GetShare().GetExpiration(),
Creator: ctxpkg.ContextMustGetUser(ctx).GetId(),
Creator: u.GetId(),
}
if grant.GetPermissions() == nil {
@@ -410,7 +419,11 @@ func (s *svc) addGrant(ctx context.Context, id *provider.ResourceId, g *provider
ResourceId: id,
}
creator := ctxpkg.ContextMustGetUser(ctx)
creator, ok := ctxpkg.ContextGetUser(ctx)
if !ok {
return nil, errors.New("user not found in context")
}
grantReq := &provider.AddGrantRequest{
Ref: ref,
Grant: &provider.Grant{

View File

@@ -31,13 +31,9 @@ import (
gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1"
rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
typespb "github.com/cs3org/go-cs3apis/cs3/types/v1beta1"
"github.com/cs3org/reva/v2/internal/http/services/datagateway"
"github.com/cs3org/reva/v2/internal/http/services/owncloud/ocdav/net"
ctxpkg "github.com/cs3org/reva/v2/pkg/ctx"
"github.com/cs3org/reva/v2/pkg/rgrpc/status"
"github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool"
"github.com/cs3org/reva/v2/pkg/rhttp"
"github.com/cs3org/reva/v2/pkg/rhttp/global"
"github.com/cs3org/reva/v2/pkg/sharedconf"
"github.com/cs3org/reva/v2/pkg/storagespace"
@@ -230,92 +226,42 @@ func (s *svc) handleNew(w http.ResponseWriter, r *http.Request) {
return
}
// Create empty file via storageprovider
createReq := &provider.InitiateFileUploadRequest{
touchFileReq := &provider.TouchFileRequest{
Ref: fileRef,
Opaque: &typespb.Opaque{
Map: map[string]*typespb.OpaqueEntry{
"Upload-Length": {
Decoder: "plain",
Value: []byte("0"),
},
},
},
}
// having a client.CreateFile() function would come in handy here...
createRes, err := client.InitiateFileUpload(ctx, createReq)
touchRes, err := client.TouchFile(ctx, touchFileReq)
if err != nil {
writeError(w, r, appErrorServerError, "error calling InitiateFileUpload", err)
writeError(w, r, appErrorServerError, "error sending a grpc touchfile request", err)
return
}
if createRes.Status.Code != rpc.Code_CODE_OK {
switch createRes.Status.Code {
case rpc.Code_CODE_PERMISSION_DENIED:
if touchRes.Status.Code != rpc.Code_CODE_OK {
if touchRes.Status.Code == rpc.Code_CODE_PERMISSION_DENIED {
writeError(w, r, appErrorPermissionDenied, "permission denied to create the file", nil)
return
case rpc.Code_CODE_NOT_FOUND:
writeError(w, r, appErrorNotFound, "parent container does not exist", nil)
return
default:
writeError(w, r, appErrorServerError, "error calling InitiateFileUpload", nil)
return
}
writeError(w, r, appErrorServerError, "touching the file failed", nil)
return
}
// Do a HTTP PUT with an empty body
var ep, token string
for _, p := range createRes.Protocols {
if p.Protocol == "simple" {
ep, token = p.UploadEndpoint, p.Token
}
}
httpReq, err := rhttp.NewRequest(ctx, http.MethodPut, ep, nil)
// Stat the newly created file
statRes, err := client.Stat(ctx, statFileReq)
if err != nil {
writeError(w, r, appErrorServerError, "failed to create the file", err)
writeError(w, r, appErrorServerError, "statting the created file failed", err)
return
}
httpReq.Header.Set(datagateway.TokenTransportHeader, token)
httpRes, err := rhttp.GetHTTPClient(
rhttp.Context(ctx),
rhttp.Insecure(s.conf.Insecure),
).Do(httpReq)
if err != nil {
writeError(w, r, appErrorServerError, "failed to create the file", err)
return
}
defer httpRes.Body.Close()
if httpRes.StatusCode == http.StatusBadRequest {
// the file upload was already finished since it is a zero byte file
} else if httpRes.StatusCode != http.StatusOK {
writeError(w, r, appErrorServerError, "failed to create the file", nil)
if statRes.Status.Code != rpc.Code_CODE_OK {
writeError(w, r, appErrorServerError, "statting the created file failed", nil)
return
}
var fileid string
if httpRes.Header.Get(net.HeaderOCFileID) != "" {
fileid = httpRes.Header.Get(net.HeaderOCFileID)
} else {
// Stat the newly created file
statRes, err := client.Stat(ctx, statFileReq)
if err != nil {
writeError(w, r, appErrorServerError, "statting the created file failed", err)
return
}
if statRes.Status.Code != rpc.Code_CODE_OK {
writeError(w, r, appErrorServerError, "statting the created file failed", nil)
return
}
if statRes.Info.Type != provider.ResourceType_RESOURCE_TYPE_FILE {
writeError(w, r, appErrorInvalidParameter, "the given file id does not point to a file", nil)
return
}
fileid = storagespace.FormatResourceID(statRes.Info.Id)
if statRes.Info.Type != provider.ResourceType_RESOURCE_TYPE_FILE {
writeError(w, r, appErrorInvalidParameter, "the given file id does not point to a file", nil)
return
}
fileid := storagespace.FormatResourceID(statRes.Info.Id)
js, err := json.Marshal(
map[string]interface{}{

View File

@@ -32,7 +32,7 @@ import (
"github.com/cs3org/reva/v2/pkg/rhttp"
"github.com/cs3org/reva/v2/pkg/rhttp/global"
"github.com/cs3org/reva/v2/pkg/sharedconf"
"github.com/golang-jwt/jwt"
"github.com/golang-jwt/jwt/v5"
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
"github.com/rs/zerolog"
@@ -58,7 +58,7 @@ func init() {
// transferClaims are custom claims for a JWT token to be used between the metadata and data gateways.
type transferClaims struct {
jwt.StandardClaims
jwt.RegisteredClaims
Target string `json:"target"`
}
type config struct {

View File

@@ -47,7 +47,7 @@ import (
"github.com/cs3org/reva/v2/pkg/sharedconf"
"github.com/cs3org/reva/v2/pkg/storage/utils/templates"
"github.com/cs3org/reva/v2/pkg/storagespace"
"github.com/golang-jwt/jwt"
"github.com/golang-jwt/jwt/v5"
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
)
@@ -379,16 +379,16 @@ func getAppURLs(c *config) (map[string]map[string]string, error) {
func (p *wopiProvider) getAccessTokenTTL(ctx context.Context) (string, error) {
tkn := ctxpkg.ContextMustGetToken(ctx)
token, err := jwt.ParseWithClaims(tkn, &jwt.StandardClaims{}, func(token *jwt.Token) (interface{}, error) {
token, err := jwt.ParseWithClaims(tkn, &jwt.RegisteredClaims{}, func(token *jwt.Token) (interface{}, error) {
return []byte(p.conf.JWTSecret), nil
})
if err != nil {
return "", err
}
if claims, ok := token.Claims.(*jwt.StandardClaims); ok && token.Valid {
if claims, ok := token.Claims.(*jwt.RegisteredClaims); ok && token.Valid {
// milliseconds since Jan 1, 1970 UTC as required in https://wopi.readthedocs.io/projects/wopirest/en/latest/concepts.html?highlight=access_token_ttl#term-access-token-ttl
return strconv.FormatInt(claims.ExpiresAt*1000, 10), nil
return strconv.FormatInt(claims.ExpiresAt.Unix()*1000, 10), nil
}
return "", errtypes.InvalidCredentials("wopi: invalid token present in ctx")

View File

@@ -21,13 +21,13 @@ package manager
import (
"time"
"github.com/golang-jwt/jwt"
"github.com/golang-jwt/jwt/v5"
"github.com/pkg/errors"
"github.com/sethvargo/go-password/password"
)
type userToken struct {
jwt.StandardClaims
jwt.RegisteredClaims
User string `json:"user"`
Scope string `json:"scope"`
@@ -45,10 +45,10 @@ var (
func generateUserToken(user string, scope string, timeout int) (string, error) {
// Create a JWT as the user token
claims := userToken{
StandardClaims: jwt.StandardClaims{
ExpiresAt: time.Now().Add(time.Duration(timeout) * time.Second).Unix(),
RegisteredClaims: jwt.RegisteredClaims{
ExpiresAt: jwt.NewNumericDate(time.Now().Add(time.Duration(timeout) * time.Second)),
Issuer: tokenIssuer,
IssuedAt: time.Now().Unix(),
IssuedAt: jwt.NewNumericDate(time.Now()),
},
User: user,
Scope: scope,

View File

@@ -84,7 +84,7 @@ func New(m map[string]interface{}, stream events.Stream, log *zerolog.Logger) (s
return nil, fmt.Errorf("unknown metadata backend %s, only 'messagepack' or 'xattrs' (default) supported", o.MetadataBackend)
}
trashbin, err := trashbin.New(o, lu)
trashbin, err := trashbin.New(o, lu, log)
if err != nil {
return nil, err
}

View File

@@ -26,6 +26,9 @@ import (
"strings"
"time"
"github.com/google/uuid"
"github.com/rs/zerolog"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
typesv1beta1 "github.com/cs3org/go-cs3apis/cs3/types/v1beta1"
"github.com/cs3org/reva/v2/pkg/storage"
@@ -34,13 +37,13 @@ import (
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata/prefixes"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node"
"github.com/cs3org/reva/v2/pkg/utils"
"github.com/google/uuid"
)
type Trashbin struct {
fs storage.FS
o *options.Options
lu *lookup.Lookup
fs storage.FS
o *options.Options
lu *lookup.Lookup
log *zerolog.Logger
}
const (
@@ -49,10 +52,11 @@ const (
)
// New returns a new Trashbin
func New(o *options.Options, lu *lookup.Lookup) (*Trashbin, error) {
func New(o *options.Options, lu *lookup.Lookup, log *zerolog.Logger) (*Trashbin, error) {
return &Trashbin{
o: o,
lu: lu,
o: o,
lu: lu,
log: log,
}, nil
}
@@ -261,7 +265,9 @@ func (tb *Trashbin) RestoreRecycleItem(ctx context.Context, ref *provider.Refere
if err != nil {
return err
}
_ = tb.lu.CacheID(ctx, n.SpaceID, string(id), restorePath)
if err := tb.lu.CacheID(ctx, n.SpaceID, string(id), restorePath); err != nil {
tb.log.Error().Err(err).Str("spaceID", n.SpaceID).Str("id", string(id)).Str("path", restorePath).Msg("trashbin: error caching id")
}
// cleanup trash info
if relativePath == "." || relativePath == "/" {

View File

@@ -165,6 +165,7 @@ func (t *Tree) Scan(path string, action EventAction, isDir bool) error {
// cases:
switch action {
case ActionCreate:
t.log.Debug().Str("path", path).Bool("isDir", isDir).Msg("scanning path (ActionCreate)")
if !isDir {
// 1. New file (could be emitted as part of a new directory)
// -> assimilate file
@@ -197,6 +198,7 @@ func (t *Tree) Scan(path string, action EventAction, isDir bool) error {
}
case ActionUpdate:
t.log.Debug().Str("path", path).Bool("isDir", isDir).Msg("scanning path (ActionUpdate)")
// 3. Updated file
// -> update file unless parent directory is being rescanned
if !t.scanDebouncer.InProgress(filepath.Dir(path)) {
@@ -207,6 +209,7 @@ func (t *Tree) Scan(path string, action EventAction, isDir bool) error {
}
case ActionMove:
t.log.Debug().Str("path", path).Bool("isDir", isDir).Msg("scanning path (ActionMove)")
// 4. Moved file
// -> update file
// 5. Moved directory
@@ -218,6 +221,7 @@ func (t *Tree) Scan(path string, action EventAction, isDir bool) error {
})
case ActionMoveFrom:
t.log.Debug().Str("path", path).Bool("isDir", isDir).Msg("scanning path (ActionMoveFrom)")
// 6. file/directory moved out of the watched directory
// -> update directory
if err := t.setDirty(filepath.Dir(path), true); err != nil {
@@ -227,9 +231,16 @@ func (t *Tree) Scan(path string, action EventAction, isDir bool) error {
go func() { _ = t.WarmupIDCache(filepath.Dir(path), false, true) }()
case ActionDelete:
_ = t.HandleFileDelete(path)
t.log.Debug().Str("path", path).Bool("isDir", isDir).Msg("handling deleted item")
// 7. Deleted file or directory
// -> update parent and all children
err := t.HandleFileDelete(path)
if err != nil {
return err
}
t.scanDebouncer.Debounce(scanItem{
Path: filepath.Dir(path),
ForceRescan: true,
@@ -242,8 +253,12 @@ func (t *Tree) Scan(path string, action EventAction, isDir bool) error {
func (t *Tree) HandleFileDelete(path string) error {
// purge metadata
_ = t.lookup.(*lookup.Lookup).IDCache.DeleteByPath(context.Background(), path)
_ = t.lookup.MetadataBackend().Purge(context.Background(), path)
if err := t.lookup.(*lookup.Lookup).IDCache.DeleteByPath(context.Background(), path); err != nil {
t.log.Error().Err(err).Str("path", path).Msg("could not delete id cache entry by path")
}
if err := t.lookup.MetadataBackend().Purge(context.Background(), path); err != nil {
t.log.Error().Err(err).Str("path", path).Msg("could not purge metadata")
}
// send event
owner, spaceID, nodeID, parentID, err := t.getOwnerAndIDs(filepath.Dir(path))
@@ -369,23 +384,36 @@ func (t *Tree) assimilate(item scanItem) error {
if ok && len(previousParentID) > 0 && previousPath != item.Path {
_, err := os.Stat(previousPath)
if err == nil {
// this id clashes with an existing id -> clear metadata and re-assimilate
// this id clashes with an existing item -> clear metadata and re-assimilate
t.log.Debug().Str("path", item.Path).Msg("ID clash detected, purging metadata and re-assimilating")
_ = t.lookup.MetadataBackend().Purge(context.Background(), item.Path)
if err := t.lookup.MetadataBackend().Purge(context.Background(), item.Path); err != nil {
t.log.Error().Err(err).Str("path", item.Path).Msg("could not purge metadata")
}
go func() {
_ = t.assimilate(scanItem{Path: item.Path, ForceRescan: true})
if err := t.assimilate(scanItem{Path: item.Path, ForceRescan: true}); err != nil {
t.log.Error().Err(err).Str("path", item.Path).Msg("could not re-assimilate")
}
}()
} else {
// this is a move
_ = t.lookup.(*lookup.Lookup).CacheID(context.Background(), spaceID, string(id), item.Path)
t.log.Debug().Str("path", item.Path).Msg("move detected")
if err := t.lookup.(*lookup.Lookup).CacheID(context.Background(), spaceID, string(id), item.Path); err != nil {
t.log.Error().Err(err).Str("spaceID", spaceID).Str("id", string(id)).Str("path", item.Path).Msg("could not cache id")
}
_, err := t.updateFile(item.Path, string(id), spaceID)
if err != nil {
return err
}
// purge original metadata. Only delete the path entry using DeletePath(reverse lookup), not the whole entry pair.
_ = t.lookup.(*lookup.Lookup).IDCache.DeletePath(context.Background(), previousPath)
_ = t.lookup.MetadataBackend().Purge(context.Background(), previousPath)
if err := t.lookup.(*lookup.Lookup).IDCache.DeletePath(context.Background(), previousPath); err != nil {
t.log.Error().Err(err).Str("path", previousPath).Msg("could not delete id cache entry by path")
}
if err := t.lookup.MetadataBackend().Purge(context.Background(), previousPath); err != nil {
t.log.Error().Err(err).Str("path", previousPath).Msg("could not purge metadata")
}
fi, err := os.Stat(item.Path)
if err != nil {
@@ -393,7 +421,11 @@ func (t *Tree) assimilate(item scanItem) error {
}
if fi.IsDir() {
// if it was moved and it is a directory we need to propagate the move
go func() { _ = t.WarmupIDCache(item.Path, false, true) }()
go func() {
if err := t.WarmupIDCache(item.Path, false, true); err != nil {
t.log.Error().Err(err).Str("path", item.Path).Msg("could not warmup id cache")
}
}()
}
parentID, err := t.lookup.MetadataBackend().Get(context.Background(), item.Path, prefixes.ParentidAttr)
@@ -426,7 +458,10 @@ func (t *Tree) assimilate(item scanItem) error {
}
} else {
// This item had already been assimilated in the past. Update the path
_ = t.lookup.(*lookup.Lookup).CacheID(context.Background(), spaceID, string(id), item.Path)
t.log.Debug().Str("path", item.Path).Msg("updating cached path")
if err := t.lookup.(*lookup.Lookup).CacheID(context.Background(), spaceID, string(id), item.Path); err != nil {
t.log.Error().Err(err).Str("spaceID", spaceID).Str("id", string(id)).Str("path", item.Path).Msg("could not cache id")
}
_, err := t.updateFile(item.Path, string(id), spaceID)
if err != nil {
@@ -434,6 +469,7 @@ func (t *Tree) assimilate(item scanItem) error {
}
}
} else {
t.log.Debug().Str("path", item.Path).Msg("new item detected")
// assimilate new file
newId := uuid.New().String()
fi, err := t.updateFile(item.Path, newId, spaceID)
@@ -550,12 +586,15 @@ assimilate:
return nil, errors.Wrap(err, "failed to propagate")
}
t.log.Debug().Str("path", path).Interface("attributes", attributes).Msg("setting attributes")
err = t.lookup.MetadataBackend().SetMultiple(context.Background(), path, attributes, false)
if err != nil {
return nil, errors.Wrap(err, "failed to set attributes")
}
_ = t.lookup.(*lookup.Lookup).CacheID(context.Background(), spaceID, id, path)
if err := t.lookup.(*lookup.Lookup).CacheID(context.Background(), spaceID, id, path); err != nil {
t.log.Error().Err(err).Str("spaceID", spaceID).Str("id", id).Str("path", path).Msg("could not cache id")
}
return fi, nil
}
@@ -654,10 +693,14 @@ func (t *Tree) WarmupIDCache(root string, assimilate, onlyDirty bool) error {
_ = t.assimilate(scanItem{Path: path, ForceRescan: true})
}
}
_ = t.lookup.(*lookup.Lookup).CacheID(context.Background(), string(spaceID), string(id), path)
if err := t.lookup.(*lookup.Lookup).CacheID(context.Background(), string(spaceID), string(id), path); err != nil {
t.log.Error().Err(err).Str("spaceID", string(spaceID)).Str("id", string(id)).Str("path", path).Msg("could not cache id")
}
}
} else if assimilate {
_ = t.assimilate(scanItem{Path: path, ForceRescan: true})
if err := t.assimilate(scanItem{Path: path, ForceRescan: true}); err != nil {
t.log.Error().Err(err).Str("path", path).Msg("could not assimilate item")
}
}
return t.setDirty(path, false)
})
@@ -665,9 +708,13 @@ func (t *Tree) WarmupIDCache(root string, assimilate, onlyDirty bool) error {
for dir, size := range sizes {
if dir == root {
// Propagate the size diff further up the tree
_ = t.propagateSizeDiff(dir, size)
if err := t.propagateSizeDiff(dir, size); err != nil {
t.log.Error().Err(err).Str("path", dir).Msg("could not propagate size diff")
}
}
if err := t.lookup.MetadataBackend().Set(context.Background(), dir, prefixes.TreesizeAttr, []byte(fmt.Sprintf("%d", size))); err != nil {
t.log.Error().Err(err).Str("path", dir).Int64("size", size).Msg("could not set tree size")
}
_ = t.lookup.MetadataBackend().Set(context.Background(), dir, prefixes.TreesizeAttr, []byte(fmt.Sprintf("%d", size)))
}
if err != nil {

View File

@@ -25,10 +25,13 @@ import (
"os"
"strconv"
"time"
"github.com/rs/zerolog"
)
type GpfsFileAuditLoggingWatcher struct {
tree *Tree
log *zerolog.Logger
}
type lwe struct {
@@ -37,9 +40,10 @@ type lwe struct {
BytesWritten string
}
func NewGpfsFileAuditLoggingWatcher(tree *Tree, auditLogFile string) (*GpfsFileAuditLoggingWatcher, error) {
func NewGpfsFileAuditLoggingWatcher(tree *Tree, auditLogFile string, log *zerolog.Logger) (*GpfsFileAuditLoggingWatcher, error) {
w := &GpfsFileAuditLoggingWatcher{
tree: tree,
log: log,
}
_, err := os.Stat(auditLogFile)
@@ -75,25 +79,33 @@ start:
case nil:
err := json.Unmarshal([]byte(line), ev)
if err != nil {
w.log.Error().Err(err).Str("line", line).Msg("error unmarshalling line")
continue
}
if isLockFile(ev.Path) || isTrash(ev.Path) || w.tree.isUpload(ev.Path) {
continue
}
switch ev.Event {
case "CREATE":
go func() { _ = w.tree.Scan(ev.Path, ActionCreate, false) }()
case "CLOSE":
bytesWritten, err := strconv.Atoi(ev.BytesWritten)
if err == nil && bytesWritten > 0 {
go func() { _ = w.tree.Scan(ev.Path, ActionUpdate, false) }()
go func() {
switch ev.Event {
case "CREATE":
err = w.tree.Scan(ev.Path, ActionCreate, false)
case "CLOSE":
var bytesWritten int
bytesWritten, err = strconv.Atoi(ev.BytesWritten)
if err == nil && bytesWritten > 0 {
err = w.tree.Scan(ev.Path, ActionUpdate, false)
}
case "RENAME":
err = w.tree.Scan(ev.Path, ActionMove, false)
if warmupErr := w.tree.WarmupIDCache(ev.Path, false, false); warmupErr != nil {
w.log.Error().Err(warmupErr).Str("path", ev.Path).Msg("error warming up id cache")
}
}
case "RENAME":
go func() {
_ = w.tree.Scan(ev.Path, ActionMove, false)
_ = w.tree.WarmupIDCache(ev.Path, false, false)
}()
}
if err != nil {
w.log.Error().Err(err).Str("line", line).Msg("error unmarshalling line")
}
}()
case io.EOF:
time.Sleep(1 * time.Second)
default:

View File

@@ -25,18 +25,21 @@ import (
"strconv"
"strings"
"github.com/rs/zerolog"
kafka "github.com/segmentio/kafka-go"
)
type GpfsWatchFolderWatcher struct {
tree *Tree
brokers []string
log *zerolog.Logger
}
func NewGpfsWatchFolderWatcher(tree *Tree, kafkaBrokers []string) (*GpfsWatchFolderWatcher, error) {
func NewGpfsWatchFolderWatcher(tree *Tree, kafkaBrokers []string, log *zerolog.Logger) (*GpfsWatchFolderWatcher, error) {
return &GpfsWatchFolderWatcher{
tree: tree,
brokers: kafkaBrokers,
log: log,
}, nil
}
@@ -66,23 +69,27 @@ func (w *GpfsWatchFolderWatcher) Watch(topic string) {
go func() {
isDir := strings.Contains(lwev.Event, "IN_ISDIR")
var err error
switch {
case strings.Contains(lwev.Event, "IN_DELETE"):
_ = w.tree.Scan(lwev.Path, ActionDelete, isDir)
err = w.tree.Scan(lwev.Path, ActionDelete, isDir)
case strings.Contains(lwev.Event, "IN_MOVE_FROM"):
_ = w.tree.Scan(lwev.Path, ActionMoveFrom, isDir)
err = w.tree.Scan(lwev.Path, ActionMoveFrom, isDir)
case strings.Contains(lwev.Event, "IN_CREATE"):
_ = w.tree.Scan(lwev.Path, ActionCreate, isDir)
err = w.tree.Scan(lwev.Path, ActionCreate, isDir)
case strings.Contains(lwev.Event, "IN_CLOSE_WRITE"):
bytesWritten, err := strconv.Atoi(lwev.BytesWritten)
if err == nil && bytesWritten > 0 {
_ = w.tree.Scan(lwev.Path, ActionUpdate, isDir)
bytesWritten, convErr := strconv.Atoi(lwev.BytesWritten)
if convErr == nil && bytesWritten > 0 {
err = w.tree.Scan(lwev.Path, ActionUpdate, isDir)
}
case strings.Contains(lwev.Event, "IN_MOVED_TO"):
_ = w.tree.Scan(lwev.Path, ActionMove, isDir)
err = w.tree.Scan(lwev.Path, ActionMove, isDir)
}
if err != nil {
w.log.Error().Err(err).Str("path", lwev.Path).Msg("error scanning path")
}
}()
}

View File

@@ -22,15 +22,18 @@ import (
"fmt"
"github.com/pablodz/inotifywaitgo/inotifywaitgo"
"github.com/rs/zerolog"
)
type InotifyWatcher struct {
tree *Tree
log *zerolog.Logger
}
func NewInotifyWatcher(tree *Tree) *InotifyWatcher {
func NewInotifyWatcher(tree *Tree, log *zerolog.Logger) *InotifyWatcher {
return &InotifyWatcher{
tree: tree,
log: log,
}
}
@@ -60,20 +63,29 @@ func (iw *InotifyWatcher) Watch(path string) {
for {
select {
case event := <-events:
if isLockFile(event.Filename) || isTrash(event.Filename) || iw.tree.isUpload(event.Filename) {
continue
}
for _, e := range event.Events {
if isLockFile(event.Filename) || isTrash(event.Filename) || iw.tree.isUpload(event.Filename) {
continue
}
go func() {
var err error
switch e {
case inotifywaitgo.DELETE:
_ = iw.tree.Scan(event.Filename, ActionDelete, event.IsDir)
err = iw.tree.Scan(event.Filename, ActionDelete, event.IsDir)
case inotifywaitgo.MOVED_FROM:
_ = iw.tree.Scan(event.Filename, ActionMoveFrom, event.IsDir)
err = iw.tree.Scan(event.Filename, ActionMoveFrom, event.IsDir)
case inotifywaitgo.CREATE, inotifywaitgo.MOVED_TO:
_ = iw.tree.Scan(event.Filename, ActionCreate, event.IsDir)
err = iw.tree.Scan(event.Filename, ActionCreate, event.IsDir)
case inotifywaitgo.CLOSE_WRITE:
_ = iw.tree.Scan(event.Filename, ActionUpdate, event.IsDir)
err = iw.tree.Scan(event.Filename, ActionUpdate, event.IsDir)
case inotifywaitgo.CLOSE:
// ignore, already handled by CLOSE_WRITE
default:
iw.log.Warn().Interface("event", event).Msg("unhandled event")
return
}
if err != nil {
iw.log.Error().Err(err).Str("path", event.Filename).Msg("error scanning file")
}
}()
}

View File

@@ -121,17 +121,17 @@ func New(lu node.PathLookup, bs Blobstore, um usermapper.Mapper, trashbin *trash
var err error
switch o.WatchType {
case "gpfswatchfolder":
t.watcher, err = NewGpfsWatchFolderWatcher(t, strings.Split(o.WatchFolderKafkaBrokers, ","))
t.watcher, err = NewGpfsWatchFolderWatcher(t, strings.Split(o.WatchFolderKafkaBrokers, ","), log)
if err != nil {
return nil, err
}
case "gpfsfileauditlogging":
t.watcher, err = NewGpfsFileAuditLoggingWatcher(t, o.WatchPath)
t.watcher, err = NewGpfsFileAuditLoggingWatcher(t, o.WatchPath, log)
if err != nil {
return nil, err
}
default:
t.watcher = NewInotifyWatcher(t)
t.watcher = NewInotifyWatcher(t, log)
watchPath = o.Root
}
@@ -213,7 +213,9 @@ func (t *Tree) TouchFile(ctx context.Context, n *node.Node, markprocessing bool,
n.SetType(provider.ResourceType_RESOURCE_TYPE_FILE)
// Set id in cache
_ = t.lookup.(*lookup.Lookup).CacheID(context.Background(), n.SpaceID, n.ID, nodePath)
if err := t.lookup.(*lookup.Lookup).CacheID(context.Background(), n.SpaceID, n.ID, nodePath); err != nil {
t.log.Error().Err(err).Str("spaceID", n.SpaceID).Str("id", n.ID).Str("path", nodePath).Msg("could not cache id")
}
if err := os.MkdirAll(filepath.Dir(nodePath), 0700); err != nil {
return errors.Wrap(err, "Decomposedfs: error creating node")
@@ -312,7 +314,9 @@ func (t *Tree) Move(ctx context.Context, oldNode *node.Node, newNode *node.Node)
if newNode.ID == "" {
newNode.ID = oldNode.ID
}
_ = t.lookup.(*lookup.Lookup).CacheID(ctx, newNode.SpaceID, newNode.ID, filepath.Join(newNode.ParentPath(), newNode.Name))
if err := t.lookup.(*lookup.Lookup).CacheID(ctx, newNode.SpaceID, newNode.ID, filepath.Join(newNode.ParentPath(), newNode.Name)); err != nil {
t.log.Error().Err(err).Str("spaceID", newNode.SpaceID).Str("id", newNode.ID).Str("path", filepath.Join(newNode.ParentPath(), newNode.Name)).Msg("could not cache id")
}
// rename the lock (if it exists)
if _, err := os.Stat(oldNode.LockFilePath()); err == nil {
@@ -476,7 +480,11 @@ func (t *Tree) Delete(ctx context.Context, n *node.Node) error {
}
// remove entry from cache immediately to avoid inconsistencies
defer func() { _ = t.idCache.Delete(path) }()
defer func() {
if err := t.idCache.Delete(path); err != nil {
log.Error().Err(err).Str("path", path).Msg("could not delete id from cache")
}
}()
if appctx.DeletingSharedResourceFromContext(ctx) {
src := filepath.Join(n.ParentPath(), n.Name)
@@ -495,7 +503,9 @@ func (t *Tree) Delete(ctx context.Context, n *node.Node) error {
}
// Remove lock file if it exists
_ = os.Remove(n.LockFilePath())
if err := os.Remove(n.LockFilePath()); err != nil {
log.Error().Err(err).Str("path", n.LockFilePath()).Msg("could not remove lock file")
}
err := t.trashbin.MoveToTrash(ctx, n, path)
if err != nil {
@@ -748,7 +758,9 @@ func (t *Tree) createDirNode(ctx context.Context, n *node.Node) (err error) {
return errors.Wrap(err, "Decomposedfs: error creating node")
}
_ = idcache.Set(ctx, n.SpaceID, n.ID, path)
if err := idcache.Set(ctx, n.SpaceID, n.ID, path); err != nil {
log.Error().Err(err).Str("spaceID", n.SpaceID).Str("id", n.ID).Str("path", path).Msg("could not cache id")
}
attributes := n.NodeMetadata(ctx)
attributes[prefixes.IDAttr] = []byte(n.ID)

View File

@@ -235,14 +235,17 @@ func (lu *Lookup) GenerateSpaceID(spaceType string, owner *user.User) (string, e
// Path returns the path for node
func (lu *Lookup) Path(ctx context.Context, n *node.Node, hasPermission node.PermissionFunc) (p string, err error) {
root := n.SpaceRoot
var child *node.Node
for n.ID != root.ID {
p = filepath.Join(n.Name, p)
child = n
if n, err = n.Parent(ctx); err != nil {
appctx.GetLogger(ctx).
Error().Err(err).
Str("path", p).
Str("spaceid", n.SpaceID).
Str("nodeid", n.ID).
Str("spaceid", child.SpaceID).
Str("nodeid", child.ID).
Str("parentid", child.ParentID).
Msg("Path()")
return
}

View File

@@ -312,11 +312,8 @@ func (store OcisStore) updateExistingNode(ctx context.Context, session *OcisSess
}
unlock := func() error {
err := f.Close()
if err != nil {
return err
}
return os.Remove(store.lu.MetadataBackend().LockfilePath(targetPath))
// NOTE: to prevent stale NFS file handles do not remove lock file!
return f.Close()
}
old, _ := node.ReadNode(ctx, store.lu, spaceID, n.ID, false, nil, false)

View File

@@ -33,7 +33,7 @@ import (
userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"github.com/golang-jwt/jwt"
"github.com/golang-jwt/jwt/v5"
"github.com/pkg/errors"
tusd "github.com/tus/tusd/v2/pkg/handler"
"go.opentelemetry.io/otel"
@@ -372,17 +372,17 @@ func (session *OcisSession) Cleanup(revertNodeMetadata, cleanBin, cleanInfo bool
// URL returns a url to download an upload
func (session *OcisSession) URL(_ context.Context) (string, error) {
type transferClaims struct {
jwt.StandardClaims
jwt.RegisteredClaims
Target string `json:"target"`
}
u := joinurl(session.store.tknopts.DownloadEndpoint, "tus/", session.ID())
ttl := time.Duration(session.store.tknopts.TransferExpires) * time.Second
claims := transferClaims{
StandardClaims: jwt.StandardClaims{
ExpiresAt: time.Now().Add(ttl).Unix(),
Audience: "reva",
IssuedAt: time.Now().Unix(),
RegisteredClaims: jwt.RegisteredClaims{
ExpiresAt: jwt.NewNumericDate(time.Now().Add(ttl)),
Audience: jwt.ClaimStrings{"reva"},
IssuedAt: jwt.NewNumericDate(time.Now()),
},
Target: u,
}

View File

@@ -28,20 +28,22 @@ import (
"github.com/cs3org/reva/v2/pkg/sharedconf"
"github.com/cs3org/reva/v2/pkg/token"
"github.com/cs3org/reva/v2/pkg/token/manager/registry"
"github.com/golang-jwt/jwt"
"github.com/golang-jwt/jwt/v5"
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
)
const defaultExpiration int64 = 86400 // 1 day
const defaultLeeway int64 = 5 // 5 seconds
func init() {
registry.Register("jwt", New)
}
type config struct {
Secret string `mapstructure:"secret"`
Expires int64 `mapstructure:"expires"`
Secret string `mapstructure:"secret"`
Expires int64 `mapstructure:"expires"`
tokenTimeLeeway int64 `mapstructure:"token_leeway"`
}
type manager struct {
@@ -50,7 +52,7 @@ type manager struct {
// claims are custom claims for the JWT token.
type claims struct {
jwt.StandardClaims
jwt.RegisteredClaims
User *user.User `json:"user"`
Scope map[string]*auth.Scope `json:"scope"`
}
@@ -75,6 +77,10 @@ func New(value map[string]interface{}) (token.Manager, error) {
c.Expires = defaultExpiration
}
if c.tokenTimeLeeway == 0 {
c.tokenTimeLeeway = defaultLeeway
}
c.Secret = sharedconf.GetJWTSecret(c.Secret)
if c.Secret == "" {
@@ -87,31 +93,32 @@ func New(value map[string]interface{}) (token.Manager, error) {
func (m *manager) MintToken(ctx context.Context, u *user.User, scope map[string]*auth.Scope) (string, error) {
ttl := time.Duration(m.conf.Expires) * time.Second
claims := claims{
StandardClaims: jwt.StandardClaims{
ExpiresAt: time.Now().Add(ttl).Unix(),
newClaims := claims{
RegisteredClaims: jwt.RegisteredClaims{
ExpiresAt: jwt.NewNumericDate(time.Now().Add(ttl)),
Issuer: u.Id.Idp,
Audience: "reva",
IssuedAt: time.Now().Unix(),
Audience: jwt.ClaimStrings{"reva"},
IssuedAt: jwt.NewNumericDate(time.Now()),
},
User: u,
Scope: scope,
}
t := jwt.NewWithClaims(jwt.GetSigningMethod("HS256"), claims)
t := jwt.NewWithClaims(jwt.GetSigningMethod("HS256"), newClaims)
tkn, err := t.SignedString([]byte(m.conf.Secret))
if err != nil {
return "", errors.Wrapf(err, "error signing token with claims %+v", claims)
return "", errors.Wrapf(err, "error signing token with claims %+v", newClaims)
}
return tkn, nil
}
func (m *manager) DismantleToken(ctx context.Context, tkn string) (*user.User, map[string]*auth.Scope, error) {
token, err := jwt.ParseWithClaims(tkn, &claims{}, func(token *jwt.Token) (interface{}, error) {
keyfunc := func(token *jwt.Token) (interface{}, error) {
return []byte(m.conf.Secret), nil
})
}
token, err := jwt.ParseWithClaims(tkn, &claims{}, keyfunc, jwt.WithLeeway(time.Duration(m.conf.tokenTimeLeeway)*time.Second))
if err != nil {
return nil, nil, errors.Wrap(err, "error parsing token")