use new utils methods in userlog

Signed-off-by: jkoberg <jkoberg@owncloud.com>
This commit is contained in:
jkoberg
2023-09-05 15:03:10 +02:00
parent 732228ed88
commit 74f4143f0b
6 changed files with 70 additions and 314 deletions

View File

@@ -22,11 +22,10 @@ type Config struct {
TokenManager *TokenManager `yaml:"token_manager"`
MachineAuthAPIKey string `yaml:"machine_auth_api_key" env:"OCIS_MACHINE_AUTH_API_KEY;USERLOG_MACHINE_AUTH_API_KEY" desc:"Machine auth API key used to validate internal requests necessary to access resources from other services."`
RevaGateway string `yaml:"reva_gateway" env:"OCIS_REVA_GATEWAY" desc:"CS3 gateway used to look up user metadata"`
TranslationPath string `yaml:"translation_path" env:"OCIS_TRANSLATION_PATH;USERLOG_TRANSLATION_PATH" desc:"(optional) Set this to a path with custom translations to overwrite the builtin translations. Note that file and folder naming rules apply, see the documentation for more details."`
Events Events `yaml:"events"`
Persistence Persistence `yaml:"persistence"`
RevaGateway string `yaml:"reva_gateway" env:"OCIS_REVA_GATEWAY" desc:"CS3 gateway used to look up user metadata"`
TranslationPath string `yaml:"translation_path" env:"OCIS_TRANSLATION_PATH;USERLOG_TRANSLATION_PATH" desc:"(optional) Set this to a path with custom translations to overwrite the builtin translations. Note that file and folder naming rules apply, see the documentation for more details."`
Events Events `yaml:"events"`
Persistence Persistence `yaml:"persistence"`
DisableSSE bool `yaml:"disable_sse" env:"OCIS_DISABLE_SSE,USERLOG_DISABLE_SSE" desc:"Disables server-sent events (sse). When disabled, clients will no longer receive sse notifications."`

View File

@@ -69,10 +69,6 @@ func EnsureDefaults(cfg *config.Config) {
cfg.Log = &config.Log{}
}
if cfg.MachineAuthAPIKey == "" && cfg.Commons != nil && cfg.Commons.MachineAuthAPIKey != "" {
cfg.MachineAuthAPIKey = cfg.Commons.MachineAuthAPIKey
}
if cfg.GRPCClientTLS == nil && cfg.Commons != nil {
cfg.GRPCClientTLS = structs.CopyOrZeroValue(cfg.Commons.GRPCClientTLS)
}

View File

@@ -35,10 +35,6 @@ func ParseConfig(cfg *config.Config) error {
// Validate validates the config
func Validate(cfg *config.Config) error {
if cfg.MachineAuthAPIKey == "" {
return shared.MissingMachineAuthApiKeyError(cfg.Service.Name)
}
if cfg.TokenManager.JWTSecret == "" {
return shared.MissingJWTTokenError(cfg.Service.Name)
}

View File

@@ -16,7 +16,6 @@ import (
collaboration "github.com/cs3org/go-cs3apis/cs3/sharing/collaboration/v1beta1"
storageprovider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"github.com/cs3org/reva/v2/pkg/events"
"github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool"
"github.com/cs3org/reva/v2/pkg/storagespace"
"github.com/cs3org/reva/v2/pkg/utils"
"github.com/leonelquinteros/gotext"
@@ -52,32 +51,29 @@ type OC10Notification struct {
// Converter is responsible for converting eventhistory events to OC10Notifications
type Converter struct {
locale string
gatewaySelector pool.Selectable[gateway.GatewayAPIClient]
serviceAccountID string
serviceAccountSecret string
serviceName string
translationPath string
locale string
gwc gateway.GatewayAPIClient
serviceName string
translationPath string
serviceAccountContext context.Context
// cached within one request not to query other service too much
spaces map[string]*storageprovider.StorageSpace
users map[string]*user.User
resources map[string]*storageprovider.ResourceInfo
serviceAccountContext context.Context
spaces map[string]*storageprovider.StorageSpace
users map[string]*user.User
resources map[string]*storageprovider.ResourceInfo
}
// NewConverter returns a new Converter
func NewConverter(loc string, gatewaySelector pool.Selectable[gateway.GatewayAPIClient], machineAuthAPIKey string, name string, translationPath string, serviceAccountID string, serviceAccountSecret string) *Converter {
func NewConverter(ctx context.Context, loc string, gwc gateway.GatewayAPIClient, name string, translationPath string) *Converter {
return &Converter{
locale: loc,
gatewaySelector: gatewaySelector,
serviceAccountID: serviceAccountID,
serviceAccountSecret: serviceAccountSecret,
serviceName: name,
translationPath: translationPath,
spaces: make(map[string]*storageprovider.StorageSpace),
users: make(map[string]*user.User),
resources: make(map[string]*storageprovider.ResourceInfo),
locale: loc,
gwc: gwc,
serviceName: name,
translationPath: translationPath,
serviceAccountContext: ctx,
spaces: make(map[string]*storageprovider.StorageSpace),
users: make(map[string]*user.User),
resources: make(map[string]*storageprovider.ResourceInfo),
}
}
@@ -172,12 +168,7 @@ func (c *Converter) spaceMessage(eventid string, nt NotificationTemplate, execut
return OC10Notification{}, err
}
ctx, err := c.authenticate()
if err != nil {
return OC10Notification{}, err
}
space, err := c.getSpace(ctx, spaceid)
space, err := c.getSpace(c.serviceAccountContext, spaceid)
if err != nil {
return OC10Notification{}, err
}
@@ -211,12 +202,7 @@ func (c *Converter) shareMessage(eventid string, nt NotificationTemplate, execut
return OC10Notification{}, err
}
ctx, err := c.authenticate()
if err != nil {
return OC10Notification{}, err
}
info, err := c.getResource(ctx, resourceid)
info, err := c.getResource(c.serviceAccountContext, resourceid)
if err != nil {
return OC10Notification{}, err
}
@@ -328,27 +314,11 @@ func (c *Converter) deprovisionMessage(nt NotificationTemplate, deproDate string
}, nil
}
func (c *Converter) authenticate() (context.Context, error) {
if c.serviceAccountContext != nil {
return c.serviceAccountContext, nil
}
gatewayClient, err := c.gatewaySelector.Next()
if err != nil {
return nil, err
}
ctx, err := utils.GetServiceUserContext(c.serviceAccountID, gatewayClient, c.serviceAccountSecret)
if err == nil {
c.serviceAccountContext = ctx
}
return ctx, err
}
func (c *Converter) getSpace(ctx context.Context, spaceID string) (*storageprovider.StorageSpace, error) {
if space, ok := c.spaces[spaceID]; ok {
return space, nil
}
space, err := getSpace(ctx, spaceID, c.gatewaySelector)
space, err := utils.GetSpace(ctx, spaceID, c.gwc)
if err == nil {
c.spaces[spaceID] = space
}
@@ -359,7 +329,7 @@ func (c *Converter) getResource(ctx context.Context, resourceID *storageprovider
if r, ok := c.resources[resourceID.GetOpaqueId()]; ok {
return r, nil
}
resource, err := getResource(ctx, resourceID, c.gatewaySelector)
resource, err := utils.GetResourceByID(ctx, resourceID, c.gwc)
if err == nil {
c.resources[resourceID.GetOpaqueId()] = resource
}
@@ -370,7 +340,7 @@ func (c *Converter) getUser(ctx context.Context, userID *user.UserId) (*user.Use
if u, ok := c.users[userID.GetOpaqueId()]; ok {
return u, nil
}
usr, err := getUser(ctx, userID, c.gatewaySelector)
usr, err := utils.GetUser(userID, c.gwc)
if err == nil {
c.users[userID.GetOpaqueId()] = usr
}

View File

@@ -8,6 +8,7 @@ import (
"github.com/cs3org/reva/v2/pkg/appctx"
revactx "github.com/cs3org/reva/v2/pkg/ctx"
"github.com/cs3org/reva/v2/pkg/utils"
"github.com/owncloud/ocis/v2/ocis-pkg/roles"
"github.com/owncloud/ocis/v2/services/graph/pkg/service/v0/errorcode"
settings "github.com/owncloud/ocis/v2/services/settings/pkg/service/v0"
@@ -44,7 +45,21 @@ func (ul *UserlogService) HandleGetEvents(w http.ResponseWriter, r *http.Request
Value: attribute.IntValue(len(evs)),
})
conv := ul.getConverter(r.Header.Get(HeaderAcceptLanguage))
gwc, err := ul.gatewaySelector.Next()
if err != nil {
ul.log.Error().Err(err).Msg("cant get gateway client")
w.WriteHeader(http.StatusInternalServerError)
return
}
ctx, err = utils.GetServiceUserContext(ul.cfg.ServiceAccount.ServiceAccountID, gwc, ul.cfg.ServiceAccount.ServiceAccountSecret)
if err != nil {
ul.log.Error().Err(err).Msg("cant get service account")
w.WriteHeader(http.StatusInternalServerError)
return
}
conv := NewConverter(ctx, r.Header.Get(HeaderAcceptLanguage), gwc, ul.cfg.Service.Name, ul.cfg.TranslationPath)
resp := GetEventResponseOC10{}
for _, e := range evs {

View File

@@ -9,10 +9,7 @@ import (
"time"
gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1"
group "github.com/cs3org/go-cs3apis/cs3/identity/group/v1beta1"
user "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
storageprovider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"github.com/cs3org/reva/v2/pkg/events"
"github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool"
"github.com/cs3org/reva/v2/pkg/utils"
@@ -114,6 +111,18 @@ func (ul *UserlogService) processEvent(event events.Event) {
err error
)
gwc, err := ul.gatewaySelector.Next()
if err != nil {
ul.log.Error().Err(err).Msg("cannot get gateway client")
return
}
ctx, err := utils.GetServiceUserContext(ul.cfg.ServiceAccount.ServiceAccountID, gwc, ul.cfg.ServiceAccount.ServiceAccountSecret)
if err != nil {
ul.log.Error().Err(err).Msg("cannot get service account")
return
}
switch e := event.Event.(type) {
default:
err = errors.New("unhandled event")
@@ -140,7 +149,7 @@ func (ul *UserlogService) processEvent(event events.Event) {
// space related // TODO: how to find spaceadmins?
case events.SpaceDisabled:
executant = e.Executant
users, err = ul.findSpaceMembers(ul.mustAuthenticate(), e.ID.GetOpaqueId(), viewer)
users, err = utils.GetSpaceMembers(ctx, e.ID.GetOpaqueId(), gwc, utils.ViewerRole)
case events.SpaceDeleted:
executant = e.Executant
for u := range e.FinalMembers {
@@ -148,22 +157,22 @@ func (ul *UserlogService) processEvent(event events.Event) {
}
case events.SpaceShared:
executant = e.Executant
users, err = ul.resolveID(ul.mustAuthenticate(), e.GranteeUserID, e.GranteeGroupID)
users, err = utils.ResolveID(ctx, e.GranteeUserID, e.GranteeGroupID, gwc)
case events.SpaceUnshared:
executant = e.Executant
users, err = ul.resolveID(ul.mustAuthenticate(), e.GranteeUserID, e.GranteeGroupID)
users, err = utils.ResolveID(ctx, e.GranteeUserID, e.GranteeGroupID, gwc)
case events.SpaceMembershipExpired:
users, err = ul.resolveID(ul.mustAuthenticate(), e.GranteeUserID, e.GranteeGroupID)
users, err = utils.ResolveID(ctx, e.GranteeUserID, e.GranteeGroupID, gwc)
// share related
case events.ShareCreated:
executant = e.Executant
users, err = ul.resolveID(ul.mustAuthenticate(), e.GranteeUserID, e.GranteeGroupID)
users, err = utils.ResolveID(ctx, e.GranteeUserID, e.GranteeGroupID, gwc)
case events.ShareRemoved:
executant = e.Executant
users, err = ul.resolveID(ul.mustAuthenticate(), e.GranteeUserID, e.GranteeGroupID)
users, err = utils.ResolveID(ctx, e.GranteeUserID, e.GranteeGroupID, gwc)
case events.ShareExpired:
users, err = ul.resolveID(ul.mustAuthenticate(), e.GranteeUserID, e.GranteeGroupID)
users, err = utils.ResolveID(ctx, e.GranteeUserID, e.GranteeGroupID, gwc)
}
if err != nil {
@@ -180,7 +189,12 @@ func (ul *UserlogService) processEvent(event events.Event) {
// III) store the eventID for each user
for _, id := range users {
if err := ul.addEventToUser(id, event); err != nil {
if !ul.cfg.DisableSSE {
if err := ul.sendSSE(ctx, id, event, gwc); err != nil {
ul.log.Error().Err(err).Str("userid", id).Str("eventid", event.ID).Msg("cannot create sse event")
}
}
if err := ul.addEventToUser(ctx, id, event); err != nil {
ul.log.Error().Err(err).Str("userID", id).Str("eventid", event.ID).Msg("failed to store event for user")
return
}
@@ -316,19 +330,14 @@ func (ul *UserlogService) DeleteGlobalEvents(ctx context.Context, evnames []stri
})
}
func (ul *UserlogService) addEventToUser(userid string, event events.Event) error {
if !ul.cfg.DisableSSE {
if err := ul.sendSSE(userid, event); err != nil {
ul.log.Error().Err(err).Str("userid", userid).Str("eventid", event.ID).Msg("cannot create sse event")
}
}
func (ul *UserlogService) addEventToUser(ctx context.Context, userid string, event events.Event) error {
return ul.alterUserEventList(userid, func(ids []string) []string {
return append(ids, event.ID)
})
}
func (ul *UserlogService) sendSSE(userid string, event events.Event) error {
ev, err := ul.getConverter(ul.getUserLocale(userid)).ConvertEvent(event.ID, event.Event)
func (ul *UserlogService) sendSSE(ctx context.Context, userid string, event events.Event, gwc gateway.GatewayAPIClient) error {
ev, err := NewConverter(ctx, ul.getUserLocale(userid), gwc, ul.cfg.Service.Name, ul.cfg.TranslationPath).ConvertEvent(event.ID, event.Event)
if err != nil {
return err
}
@@ -419,105 +428,6 @@ func (ul *UserlogService) alterGlobalEvents(ctx context.Context, alter func(map[
})
}
// we need the spaceid to inform other space members
// we need an owner to query space members
// we need to check the user has the required role to see the event
func (ul *UserlogService) findSpaceMembers(ctx context.Context, spaceID string, requiredRole permissionChecker) ([]string, error) {
if ctx == nil {
return nil, errors.New("need authenticated context to find space members")
}
space, err := getSpace(ctx, spaceID, ul.gatewaySelector)
if err != nil {
return nil, err
}
var users []string
switch space.SpaceType {
case "personal":
users = []string{space.GetOwner().GetId().GetOpaqueId()}
case "project":
if users, err = ul.gatherSpaceMembers(ctx, space, requiredRole); err != nil {
return nil, err
}
default:
// TODO: shares? other space types?
return nil, fmt.Errorf("unsupported space type: %s", space.SpaceType)
}
return users, nil
}
func (ul *UserlogService) gatherSpaceMembers(ctx context.Context, space *storageprovider.StorageSpace, hasRequiredRole permissionChecker) ([]string, error) {
var permissionsMap map[string]*storageprovider.ResourcePermissions
if err := utils.ReadJSONFromOpaque(space.GetOpaque(), "grants", &permissionsMap); err != nil {
return nil, err
}
groupsMap := make(map[string]struct{})
if opaqueGroups, ok := space.Opaque.Map["groups"]; ok {
_ = json.Unmarshal(opaqueGroups.GetValue(), &groupsMap)
}
// we use a map to avoid duplicates
usermap := make(map[string]struct{})
for id, perm := range permissionsMap {
if !hasRequiredRole(perm) {
// not allowed to receive event
continue
}
if _, isGroup := groupsMap[id]; !isGroup {
usermap[id] = struct{}{}
continue
}
usrs, err := ul.resolveGroup(ctx, id)
if err != nil {
ul.log.Error().Err(err).Str("groupID", id).Msg("failed to resolve group")
continue
}
for _, u := range usrs {
usermap[u] = struct{}{}
}
}
var users []string
for id := range usermap {
users = append(users, id)
}
return users, nil
}
func (ul *UserlogService) resolveID(ctx context.Context, userid *user.UserId, groupid *group.GroupId) ([]string, error) {
if userid != nil {
return []string{userid.GetOpaqueId()}, nil
}
if ctx == nil {
return nil, errors.New("need ctx to resolve group id")
}
return ul.resolveGroup(ctx, groupid.GetOpaqueId())
}
// resolves the users of a group
func (ul *UserlogService) resolveGroup(ctx context.Context, groupID string) ([]string, error) {
grp, err := getGroup(ctx, groupID, ul.gatewaySelector)
if err != nil {
return nil, err
}
var userIDs []string
for _, m := range grp.GetMembers() {
userIDs = append(userIDs, m.GetOpaqueId())
}
return userIDs, nil
}
func (ul *UserlogService) getUserLocale(userid string) string {
resp, err := ul.valueClient.GetValueByUniqueIdentifiers(
micrometadata.Set(context.Background(), middleware.AccountID, userid),
@@ -537,122 +447,6 @@ func (ul *UserlogService) getUserLocale(userid string) string {
return val[0].GetStringValue()
}
func (ul *UserlogService) getConverter(locale string) *Converter {
return NewConverter(locale, ul.gatewaySelector, ul.cfg.MachineAuthAPIKey, ul.cfg.Service.Name, ul.cfg.TranslationPath, ul.cfg.ServiceAccount.ServiceAccountID, ul.cfg.ServiceAccount.ServiceAccountSecret)
}
func (ul *UserlogService) mustAuthenticate() context.Context {
ctx, err := authenticate(ul.cfg.ServiceAccount.ServiceAccountID, ul.gatewaySelector, ul.cfg.ServiceAccount.ServiceAccountSecret)
if err != nil {
ul.log.Error().Err(err).Str("accountid", ul.cfg.ServiceAccount.ServiceAccountID).Msg("failed to impersonate service account")
return nil
}
return ctx
}
func authenticate(serviceAccountID string, gatewaySelector pool.Selectable[gateway.GatewayAPIClient], serviceAccountSecret string) (context.Context, error) {
gatewayClient, err := gatewaySelector.Next()
if err != nil {
return nil, err
}
return utils.GetServiceUserContext(serviceAccountID, gatewayClient, serviceAccountSecret)
}
func getSpace(ctx context.Context, spaceID string, gatewaySelector pool.Selectable[gateway.GatewayAPIClient]) (*storageprovider.StorageSpace, error) {
gatewayClient, err := gatewaySelector.Next()
if err != nil {
return nil, err
}
res, err := gatewayClient.ListStorageSpaces(ctx, listStorageSpaceRequest(spaceID))
if err != nil {
return nil, err
}
if res.GetStatus().GetCode() != rpc.Code_CODE_OK {
return nil, fmt.Errorf("error while getting space: (%v) %s", res.GetStatus().GetCode(), res.GetStatus().GetMessage())
}
if len(res.StorageSpaces) == 0 {
return nil, fmt.Errorf("error getting storage space %s: no space returned", spaceID)
}
return res.StorageSpaces[0], nil
}
func getUser(ctx context.Context, userid *user.UserId, gatewaySelector pool.Selectable[gateway.GatewayAPIClient]) (*user.User, error) {
gatewayClient, err := gatewaySelector.Next()
if err != nil {
return nil, err
}
getUserResponse, err := gatewayClient.GetUser(context.Background(), &user.GetUserRequest{
UserId: userid,
})
if err != nil {
return nil, err
}
if getUserResponse.Status.Code != rpc.Code_CODE_OK {
return nil, fmt.Errorf("error getting user: %s", getUserResponse.Status.Message)
}
return getUserResponse.GetUser(), nil
}
func getGroup(ctx context.Context, groupid string, gatewaySelector pool.Selectable[gateway.GatewayAPIClient]) (*group.Group, error) {
gatewayClient, err := gatewaySelector.Next()
if err != nil {
return nil, err
}
r, err := gatewayClient.GetGroup(ctx, &group.GetGroupRequest{GroupId: &group.GroupId{OpaqueId: groupid}})
if err != nil {
return nil, err
}
if r.GetStatus().GetCode() != rpc.Code_CODE_OK {
return nil, fmt.Errorf("unexpected status code from gateway client: %d", r.GetStatus().GetCode())
}
return r.GetGroup(), nil
}
func getResource(ctx context.Context, resourceid *storageprovider.ResourceId, gatewaySelector pool.Selectable[gateway.GatewayAPIClient]) (*storageprovider.ResourceInfo, error) {
gatewayClient, err := gatewaySelector.Next()
if err != nil {
return nil, err
}
res, err := gatewayClient.Stat(ctx, &storageprovider.StatRequest{Ref: &storageprovider.Reference{ResourceId: resourceid}})
if err != nil {
return nil, err
}
if res.GetStatus().GetCode() != rpc.Code_CODE_OK {
return nil, fmt.Errorf("unexpected status code while getting space: %v", res.GetStatus().GetCode())
}
return res.GetInfo(), nil
}
func listStorageSpaceRequest(spaceID string) *storageprovider.ListStorageSpacesRequest {
return &storageprovider.ListStorageSpacesRequest{
Opaque: utils.AppendPlainToOpaque(nil, "unrestricted", "true"),
Filters: []*storageprovider.ListStorageSpacesRequest_Filter{
{
Type: storageprovider.ListStorageSpacesRequest_Filter_TYPE_ID,
Term: &storageprovider.ListStorageSpacesRequest_Filter_Id{
Id: &storageprovider.StorageSpaceId{
OpaqueId: spaceID,
},
},
},
},
}
}
func removeExecutant(users []string, executant *user.UserId) []string {
var usrs []string
for _, u := range users {
@@ -662,17 +456,3 @@ func removeExecutant(users []string, executant *user.UserId) []string {
}
return usrs
}
type permissionChecker func(*storageprovider.ResourcePermissions) bool
func viewer(perms *storageprovider.ResourcePermissions) bool {
return perms.Stat
}
func editor(perms *storageprovider.ResourcePermissions) bool {
return perms.InitiateFileUpload
}
func manager(perms *storageprovider.ResourcePermissions) bool {
return perms.DenyGrant
}