enhancement: bump reva and remove unnecessary explicit automount mount point naming

This commit is contained in:
Florian Schade
2024-06-13 15:21:33 +02:00
parent 73c0b2a553
commit 1f012ac9b5
8 changed files with 203 additions and 208 deletions
@@ -20,8 +20,11 @@ package usershareprovider
import (
"context"
"path/filepath"
"regexp"
"slices"
"strconv"
"strings"
gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1"
userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
@@ -46,6 +49,12 @@ import (
"github.com/cs3org/reva/v2/pkg/utils"
)
const (
_fieldMaskPathMountPoint = "mount_point"
_fieldMaskPathPermissions = "permissions"
_fieldMaskPathState = "state"
)
func init() {
rgrpc.Register("usershareprovider", NewDefault)
}
@@ -407,7 +416,7 @@ func (s *service) UpdateShare(ctx context.Context, req *collaboration.UpdateShar
// If this is a permissions update, check if user's permissions on the resource are sufficient to set the desired permissions
var newPermissions *provider.ResourcePermissions
if slices.Contains(req.GetUpdateMask().GetPaths(), "permissions") {
if slices.Contains(req.GetUpdateMask().GetPaths(), _fieldMaskPathPermissions) {
newPermissions = req.GetShare().GetPermissions().GetPermissions()
} else {
newPermissions = req.GetField().GetPermissions().GetPermissions()
@@ -497,40 +506,170 @@ func (s *service) GetReceivedShare(ctx context.Context, req *collaboration.GetRe
}
func (s *service) UpdateReceivedShare(ctx context.Context, req *collaboration.UpdateReceivedShareRequest) (*collaboration.UpdateReceivedShareResponse, error) {
if req.Share == nil {
return &collaboration.UpdateReceivedShareResponse{
Status: status.NewInvalid(ctx, "updating requires a received share object"),
}, nil
}
if req.Share.Share == nil {
return &collaboration.UpdateReceivedShareResponse{
Status: status.NewInvalid(ctx, "share missing"),
}, nil
}
if req.Share.Share.Id == nil {
return &collaboration.UpdateReceivedShareResponse{
Status: status.NewInvalid(ctx, "share id missing"),
}, nil
}
if req.Share.Share.Id.OpaqueId == "" {
if req.GetShare().GetShare().GetId().GetOpaqueId() == "" {
return &collaboration.UpdateReceivedShareResponse{
Status: status.NewInvalid(ctx, "share id empty"),
}, nil
}
isStateTransitionShareAccepted := slices.Contains(req.GetUpdateMask().GetPaths(), _fieldMaskPathState) && req.GetShare().GetState() == collaboration.ShareState_SHARE_STATE_ACCEPTED
isMountPointSet := slices.Contains(req.GetUpdateMask().GetPaths(), _fieldMaskPathMountPoint) && req.GetShare().GetMountPoint().GetPath() != ""
// we calculate a valid mountpoint only if the share should be accepted and the mount point is not set explicitly
if isStateTransitionShareAccepted && !isMountPointSet {
gatewayClient, err := s.gatewaySelector.Next()
if err != nil {
return nil, err
}
s, err := setReceivedShareMountPoint(ctx, gatewayClient, req)
switch {
case err != nil:
fallthrough
case s.GetCode() != rpc.Code_CODE_OK:
return &collaboration.UpdateReceivedShareResponse{
Status: s,
}, err
}
}
var uid userpb.UserId
_ = utils.ReadJSONFromOpaque(req.Opaque, "userid", &uid)
share, err := s.sm.UpdateReceivedShare(ctx, req.Share, req.UpdateMask, &uid)
updatedShare, err := s.sm.UpdateReceivedShare(ctx, req.Share, req.UpdateMask, &uid)
if err != nil {
return &collaboration.UpdateReceivedShareResponse{
Status: status.NewInternal(ctx, "error updating received share"),
}, nil
}
res := &collaboration.UpdateReceivedShareResponse{
return &collaboration.UpdateReceivedShareResponse{
Status: status.NewOK(ctx),
Share: share,
}
return res, nil
Share: updatedShare,
}, nil
}
// GetAvailableMountpoint returns a new or existing mountpoint
func GetAvailableMountpoint(ctx context.Context, gwc gateway.GatewayAPIClient, id *provider.ResourceId, name string, userId *userpb.UserId) (string, error) {
listReceivedSharesReq := &collaboration.ListReceivedSharesRequest{}
if userId != nil {
listReceivedSharesReq.Opaque = utils.AppendJSONToOpaque(nil, "userid", userId)
}
listReceivedSharesRes, err := gwc.ListReceivedShares(ctx, listReceivedSharesReq)
if err != nil {
return "", errtypes.InternalError("grpc list received shares request failed")
}
if err := errtypes.NewErrtypeFromStatus(listReceivedSharesRes.GetStatus()); err != nil {
return "", err
}
base := filepath.Clean(name)
mount := base
existingMountpoint := ""
mountedShares := make([]string, 0, len(listReceivedSharesRes.GetShares()))
var pathExists bool
for _, s := range listReceivedSharesRes.GetShares() {
resourceIDEqual := utils.ResourceIDEqual(s.GetShare().GetResourceId(), id)
if resourceIDEqual && s.State == collaboration.ShareState_SHARE_STATE_ACCEPTED {
// a share to the resource already exists and is mounted, remembers the mount point
_, err := utils.GetResourceByID(ctx, s.GetShare().GetResourceId(), gwc)
if err == nil {
existingMountpoint = s.GetMountPoint().GetPath()
}
}
if s.State == collaboration.ShareState_SHARE_STATE_ACCEPTED {
// collect all accepted mount points
mountedShares = append(mountedShares, s.GetMountPoint().GetPath())
if s.GetMountPoint().GetPath() == mount {
// does the shared resource still exist?
_, err := utils.GetResourceByID(ctx, s.GetShare().GetResourceId(), gwc)
if err == nil {
pathExists = true
}
// TODO we could delete shares here if the stat returns code NOT FOUND ... but listening for file deletes would be better
}
}
}
if existingMountpoint != "" {
// we want to reuse the same mountpoint for all unmounted shares to the same resource
return existingMountpoint, nil
}
// If the mount point really already exists, we need to insert a number into the filename
if pathExists {
// now we have a list of shares, we want to iterate over all of them and check for name collisions agents a mount points list
for i := 1; i <= len(mountedShares)+1; i++ {
ext := filepath.Ext(base)
name := strings.TrimSuffix(base, ext)
mount = name + " (" + strconv.Itoa(i) + ")" + ext
if !slices.Contains(mountedShares, mount) {
return mount, nil
}
}
}
return mount, nil
}
func setReceivedShareMountPoint(ctx context.Context, gwc gateway.GatewayAPIClient, req *collaboration.UpdateReceivedShareRequest) (*rpc.Status, error) {
receivedShare, err := gwc.GetReceivedShare(ctx, &collaboration.GetReceivedShareRequest{
Ref: &collaboration.ShareReference{
Spec: &collaboration.ShareReference_Id{
Id: req.GetShare().GetShare().GetId(),
},
},
})
switch {
case err != nil:
fallthrough
case receivedShare.GetStatus().GetCode() != rpc.Code_CODE_OK:
return receivedShare.GetStatus(), err
}
if receivedShare.GetShare().GetMountPoint().GetPath() != "" {
return status.NewOK(ctx), nil
}
resourceStat, err := gwc.Stat(ctx, &provider.StatRequest{
Ref: &provider.Reference{
ResourceId: receivedShare.GetShare().GetShare().GetResourceId(),
},
})
switch {
case err != nil:
fallthrough
case resourceStat.GetStatus().GetCode() != rpc.Code_CODE_OK:
return resourceStat.GetStatus(), err
}
// handle mount point related updates
{
var userID *userpb.UserId
_ = utils.ReadJSONFromOpaque(req.Opaque, "userid", &userID)
// check if the requested mount point is available and if not, find a suitable one
availableMountpoint, err := GetAvailableMountpoint(ctx, gwc,
resourceStat.GetInfo().GetId(),
resourceStat.GetInfo().GetName(),
userID,
)
if err != nil {
return status.NewInternal(ctx, err.Error()), nil
}
if !slices.Contains(req.GetUpdateMask().GetPaths(), _fieldMaskPathMountPoint) {
req.GetUpdateMask().Paths = append(req.GetUpdateMask().GetPaths(), _fieldMaskPathMountPoint)
}
req.GetShare().MountPoint = &provider.Reference{
Path: availableMountpoint,
}
}
return status.NewOK(ctx), nil
}
@@ -23,24 +23,22 @@ import (
"fmt"
"net/http"
"path"
"path/filepath"
"slices"
"strconv"
"strings"
gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1"
rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
collaboration "github.com/cs3org/go-cs3apis/cs3/sharing/collaboration/v1beta1"
ocmv1beta1 "github.com/cs3org/go-cs3apis/cs3/sharing/ocm/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"github.com/go-chi/chi/v5"
"github.com/pkg/errors"
"google.golang.org/protobuf/types/known/fieldmaskpb"
"github.com/cs3org/reva/v2/internal/http/services/owncloud/ocs/response"
"github.com/cs3org/reva/v2/pkg/appctx"
"github.com/cs3org/reva/v2/pkg/conversions"
"github.com/cs3org/reva/v2/pkg/errtypes"
"github.com/cs3org/reva/v2/pkg/utils"
"github.com/go-chi/chi/v5"
"github.com/pkg/errors"
"google.golang.org/protobuf/types/known/fieldmaskpb"
)
const (
@@ -76,7 +74,7 @@ func (h *Handler) AcceptReceivedShare(w http.ResponseWriter, r *http.Request) {
return
}
mount, unmountedShares, err := GetMountpointAndUnmountedShares(ctx, client, sharedResource.Info)
unmountedShares, err := getUnmountedShares(ctx, client, sharedResource.GetInfo().GetId())
if err != nil {
response.WriteOCSError(w, r, response.MetaServerError.StatusCode, "could not determine mountpoint", err)
return
@@ -84,12 +82,8 @@ func (h *Handler) AcceptReceivedShare(w http.ResponseWriter, r *http.Request) {
// first update the requested share
receivedShare.State = collaboration.ShareState_SHARE_STATE_ACCEPTED
// we need to add a path to the share
receivedShare.MountPoint = &provider.Reference{
Path: mount,
}
updateMask := &fieldmaskpb.FieldMask{Paths: []string{"state", "mount_point"}}
updateMask := &fieldmaskpb.FieldMask{Paths: []string{"state"}}
data, meta, err := h.updateReceivedShare(r.Context(), receivedShare, updateMask)
if err != nil {
// we log an error for affected shares, for the actual share we return an error
@@ -106,10 +100,6 @@ func (h *Handler) AcceptReceivedShare(w http.ResponseWriter, r *http.Request) {
}
rs.State = collaboration.ShareState_SHARE_STATE_ACCEPTED
// set the same mountpoint as for the requested received share
rs.MountPoint = &provider.Reference{
Path: mount,
}
_, _, err := h.updateReceivedShare(r.Context(), rs, updateMask)
if err != nil {
@@ -119,76 +109,6 @@ func (h *Handler) AcceptReceivedShare(w http.ResponseWriter, r *http.Request) {
}
}
// GetMountpointAndUnmountedShares returns a new or existing mountpoint for the given info and produces a list of unmounted received shares for the same resource
func GetMountpointAndUnmountedShares(ctx context.Context, gwc gateway.GatewayAPIClient, info *provider.ResourceInfo) (string, []*collaboration.ReceivedShare, error) {
unmountedShares := []*collaboration.ReceivedShare{}
receivedShares, err := listReceivedShares(ctx, gwc)
if err != nil {
return "", unmountedShares, err
}
// we need to sort the received shares by mount point in order to make things easier to evaluate.
base := filepath.Clean(info.Name)
mount := base
existingMountpoint := ""
mountedShares := make([]string, 0, len(receivedShares))
var pathExists bool
for _, s := range receivedShares {
resourceIDEqual := utils.ResourceIDEqual(s.GetShare().GetResourceId(), info.GetId())
if resourceIDEqual && s.State == collaboration.ShareState_SHARE_STATE_ACCEPTED {
// a share to the resource already exists and is mounted, remember the mount point
_, err := utils.GetResourceByID(ctx, s.GetShare().GetResourceId(), gwc)
if err == nil {
existingMountpoint = s.GetMountPoint().GetPath()
}
}
if resourceIDEqual && s.State != collaboration.ShareState_SHARE_STATE_ACCEPTED {
// a share to the resource already exists but is not mounted, collect the unmounted share
unmountedShares = append(unmountedShares, s)
}
if s.State == collaboration.ShareState_SHARE_STATE_ACCEPTED {
// collect all accepted mount points
mountedShares = append(mountedShares, s.GetMountPoint().GetPath())
if s.GetMountPoint().GetPath() == mount {
// does the shared resource still exist?
_, err := utils.GetResourceByID(ctx, s.GetShare().GetResourceId(), gwc)
if err == nil {
pathExists = true
}
// TODO we could delete shares here if the stat returns code NOT FOUND ... but listening for file deletes would be better
}
}
}
if existingMountpoint != "" {
// we want to reuse the same mountpoint for all unmounted shares to the same resource
return existingMountpoint, unmountedShares, nil
}
// If the mount point really already exists, we need to insert a number into the filename
if pathExists {
// now we have a list of shares, we want to iterate over all of them and check for name collisions agents a mount points list
for i := 1; i <= len(mountedShares)+1; i++ {
ext := filepath.Ext(base)
name := strings.TrimSuffix(base, ext)
// be smart about .tar.(gz|bz) files
if strings.HasSuffix(name, ".tar") {
name = strings.TrimSuffix(name, ".tar")
ext = ".tar" + ext
}
mount = name + " (" + strconv.Itoa(i) + ")" + ext
if !slices.Contains(mountedShares, mount) {
return mount, unmountedShares, nil
}
}
}
return mount, unmountedShares, nil
}
// RejectReceivedShare handles DELETE Requests on /apps/files_sharing/api/v1/shares/{shareid}
func (h *Handler) RejectReceivedShare(w http.ResponseWriter, r *http.Request) {
shareID := chi.URLParam(r, "shareid")
@@ -394,6 +314,25 @@ func getReceivedShareFromID(ctx context.Context, client gateway.GatewayAPIClient
return s.Share, nil
}
func getUnmountedShares(ctx context.Context, gwc gateway.GatewayAPIClient, id *provider.ResourceId) ([]*collaboration.ReceivedShare, error) {
var unmountedShares []*collaboration.ReceivedShare
receivedShares, err := listReceivedShares(ctx, gwc)
if err != nil {
return unmountedShares, err
}
for _, s := range receivedShares {
resourceIDEqual := utils.ResourceIDEqual(s.GetShare().GetResourceId(), id)
if resourceIDEqual && s.State != collaboration.ShareState_SHARE_STATE_ACCEPTED {
// a share to the resource already exists but is not mounted, collect the unmounted share
unmountedShares = append(unmountedShares, s)
}
}
return unmountedShares, err
}
// getSharedResource attempts to get a shared resource from the storage from the resource reference.
func getSharedResource(ctx context.Context, client gateway.GatewayAPIClient, resID *provider.ResourceId) (*provider.StatResponse, *response.Response) {
res, err := client.Stat(ctx, &provider.StatRequest{
@@ -47,10 +47,11 @@ func (bs *Blobstore) Upload(node *node.Node, source string) error {
}
defer file.Close()
f, err := os.OpenFile(node.InternalPath(), os.O_CREATE|os.O_WRONLY, 0700)
f, err := os.OpenFile(node.InternalPath(), os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0700)
if err != nil {
return errors.Wrapf(err, "could not open blob '%s' for writing", node.InternalPath())
}
defer f.Close()
w := bufio.NewWriter(f)
_, err = w.ReadFrom(file)