feat: Persist events for grouped emails based on settings

This commit is contained in:
Bastian Beier
2025-01-08 19:40:16 +01:00
parent 9af84f02e4
commit 9849e747e1
11 changed files with 664 additions and 157 deletions

View File

@@ -3,6 +3,10 @@ package command
import (
"context"
"fmt"
"github.com/cs3org/reva/v2/pkg/store"
ehsvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/eventhistory/v0"
microstore "go-micro.dev/v4/store"
"reflect"
"github.com/oklog/run"
"github.com/urfave/cli/v2"
@@ -81,7 +85,14 @@ func Server(cfg *config.Config) *cli.Command {
events.SpaceUnshared{},
events.SpaceMembershipExpired{},
events.ScienceMeshInviteTokenGenerated{},
events.SendEmailsEvent{},
}
registeredEvents := make(map[string]events.Unmarshaller)
for _, e := range evs {
typ := reflect.TypeOf(e)
registeredEvents[typ.String()] = e
}
client, err := stream.NatsFromConfig(cfg.Service.Name, false, stream.NatsConfig(cfg.Notifications.Events))
if err != nil {
return err
@@ -109,7 +120,21 @@ func Server(cfg *config.Config) *cli.Command {
logger.Fatal().Err(err).Str("addr", cfg.Notifications.RevaGateway).Msg("could not get reva gateway selector")
}
valueService := settingssvc.NewValueService("com.owncloud.api.settings", grpcClient)
svc := service.NewEventsNotifier(evts, channel, logger, gatewaySelector, valueService, cfg.ServiceAccount.ServiceAccountID, cfg.ServiceAccount.ServiceAccountSecret, cfg.Notifications.EmailTemplatePath, cfg.Notifications.DefaultLanguage, cfg.WebUIURL, cfg.Notifications.TranslationPath)
historyClient := ehsvc.NewEventHistoryService("com.owncloud.api.eventhistory", grpcClient)
notificationStore := store.Create(
store.Store(cfg.Store.Store),
store.TTL(cfg.Store.TTL),
microstore.Nodes(cfg.Store.Nodes...),
microstore.Database(cfg.Store.Database),
microstore.Table(cfg.Store.Table),
store.Authentication(cfg.Store.AuthUsername, cfg.Store.AuthPassword),
)
svc := service.NewEventsNotifier(evts, channel, logger, gatewaySelector, valueService,
cfg.ServiceAccount.ServiceAccountID, cfg.ServiceAccount.ServiceAccountSecret,
cfg.Notifications.EmailTemplatePath, cfg.Notifications.DefaultLanguage, cfg.WebUIURL,
cfg.Notifications.TranslationPath, cfg.Notifications.SMTP.Sender, notificationStore, historyClient, registeredEvents)
gr.Add(svc.Run, func(error) {
cancel()

View File

@@ -3,6 +3,7 @@ package config
import (
"context"
"time"
"github.com/owncloud/ocis/v2/ocis-pkg/shared"
)
@@ -24,6 +25,8 @@ type Config struct {
ServiceAccount ServiceAccount `yaml:"service_account"`
Context context.Context `yaml:"-"`
Store Store `yaml:"store"`
}
// Notifications defines the config options for the notifications service.
@@ -65,3 +68,20 @@ type ServiceAccount struct {
ServiceAccountID string `yaml:"service_account_id" env:"OCIS_SERVICE_ACCOUNT_ID;NOTIFICATIONS_SERVICE_ACCOUNT_ID" desc:"The ID of the service account the service should use. See the 'auth-service' service description for more details." introductionVersion:"5.0"`
ServiceAccountSecret string `yaml:"service_account_secret" env:"OCIS_SERVICE_ACCOUNT_SECRET;NOTIFICATIONS_SERVICE_ACCOUNT_SECRET" desc:"The service account secret." introductionVersion:"5.0"`
}
// TODO:
// - README
// - DOCS
// - introductionVersion correct?
// - is TTL mandatory?
// Store configures the store to use
type Store struct {
Store string `yaml:"store" env:"OCIS_PERSISTENT_STORE;NOTIFICATIONS_STORE" desc:"The type of the store. Supported values are: 'memory', 'nats-js-kv', 'redis-sentinel', 'noop'. See the text description for details." introductionVersion:"7.1"`
Nodes []string `yaml:"nodes" env:"OCIS_PERSISTENT_STORE_NODES;NOTIFICATIONS_STORE_NODES" desc:"A list of nodes to access the configured store. This has no effect when 'memory' store is configured. Note that the behaviour how nodes are used is dependent on the library of the configured store. See the Environment Variable Types description for more details." introductionVersion:"7.1"`
Database string `yaml:"database" env:"NOTIFICATIONS_STORE_DATABASE" desc:"The database name the configured store should use." introductionVersion:"7.1"`
Table string `yaml:"table" env:"NOTIFICATIONS_STORE_TABLE" desc:"The database table the store should use." introductionVersion:"7.1"`
TTL time.Duration `yaml:"ttl" env:"OCIS_PERSISTENT_STORE_TTL;NOTIFICATIONS_STORE_TTL" desc:"Time to live for notifications in the store. Defaults to '336h' (2 weeks). See the Environment Variable Types description for more details." introductionVersion:"7.1"`
AuthUsername string `yaml:"username" env:"OCIS_PERSISTENT_STORE_AUTH_USERNAME;NOTIFICATIONS_STORE_AUTH_USERNAME" desc:"The username to authenticate with the store. Only applies when store type 'nats-js-kv' is configured." introductionVersion:"7.1"`
AuthPassword string `yaml:"password" env:"OCIS_PERSISTENT_STORE_AUTH_PASSWORD;NOTIFICATIONS_STORE_AUTH_PASSWORD" desc:"The password to authenticate with the store. Only applies when store type 'nats-js-kv' is configured." introductionVersion:"7.1"`
}

View File

@@ -4,6 +4,7 @@ import (
"github.com/owncloud/ocis/v2/ocis-pkg/shared"
"github.com/owncloud/ocis/v2/ocis-pkg/structs"
"github.com/owncloud/ocis/v2/services/notifications/pkg/config"
"time"
)
// FullDefaultConfig returns a fully initialized default configuration
@@ -40,6 +41,13 @@ func DefaultConfig() *config.Config {
},
RevaGateway: shared.DefaultRevaConfig().Address,
},
Store: config.Store{
Store: "nats-js-kv",
Nodes: []string{"127.0.0.1:9233"},
Database: "notifications",
Table: "",
TTL: 336 * time.Hour,
},
}
}

View File

@@ -0,0 +1,116 @@
package service
import (
"context"
"encoding/json"
user "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
"github.com/owncloud/ocis/v2/ocis-pkg/log"
v0 "github.com/owncloud/ocis/v2/protogen/gen/ocis/messages/eventhistory/v0"
ehsvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/eventhistory/v0"
"github.com/pkg/errors"
"go-micro.dev/v4/store"
)
type userEventStore struct {
log log.Logger
store store.Store
historyClient ehsvc.EventHistoryService
}
type userEventIds struct {
User *user.User `json:"user"`
EventIds []string `json:"event_ids"`
}
type userEvents struct {
User *user.User
Events []*v0.Event
}
const (
intervalDaily = "daily"
intervalWeekly = "weekly"
)
func newUserEventStore(l log.Logger, s store.Store, hc ehsvc.EventHistoryService) *userEventStore {
return &userEventStore{log: l, store: s, historyClient: hc}
}
func (s *userEventStore) persist(interval string, eventId string, users []*user.User) []*user.User {
var errorUsers []*user.User
for _, u := range users {
key := interval + u.Id.OpaqueId
// Note: This is not thread safe and can result in missing events
records, err := s.store.Read(key)
if err != nil && err != store.ErrNotFound {
s.log.Error().Err(err).Str("eventId", eventId).Str("userId", u.Id.OpaqueId).Msg("cannot read record")
errorUsers = append(errorUsers, u)
continue
}
var record userEventIds
if len(records) == 0 {
record = userEventIds{}
} else {
if err = json.Unmarshal(records[0].Value, &record); err != nil {
s.log.Warn().Err(err).Str("eventId", eventId).Str("userId", u.Id.OpaqueId).Msg("cannot unmarshal json")
errorUsers = append(errorUsers, u)
continue
}
}
record.User = u
record.EventIds = append(record.EventIds, eventId)
b, err := json.Marshal(record)
if err != nil {
s.log.Warn().Err(err).Str("eventId", eventId).Str("userId", u.Id.OpaqueId).Msg("cannot marshal record")
errorUsers = append(errorUsers, u)
continue
}
err = s.store.Write(&store.Record{
Key: key,
Value: b,
})
if err != nil {
s.log.Error().Err(err).Str("eventId", eventId).Str("userId", u.Id.OpaqueId).Msg("cannot write record")
errorUsers = append(errorUsers, u)
continue
}
}
return errorUsers
}
func (s *userEventStore) listKeys(interval string) ([]string, error) {
return s.store.List(store.ListPrefix(interval))
}
func (s *userEventStore) pop(ctx context.Context, key string) (*userEvents, error) {
records, err := s.store.Read(key)
if err != nil && err != store.ErrNotFound {
return nil, errors.New("cannot get records")
}
if len(records) == 0 {
return nil, errors.New("no records found")
}
var record userEventIds
err = json.Unmarshal(records[0].Value, &record)
if err != nil {
s.log.Warn().Err(err).Str("key", key).Msg("cannot unmarshal json")
return nil, err
}
res, err := s.historyClient.GetEvents(ctx, &ehsvc.GetEventsRequest{Ids: record.EventIds})
if err != nil {
s.log.Error().Err(err).Strs("eventIds", record.EventIds).Msg("cannot get events")
return nil, err
}
err = s.store.Delete(key)
if err != nil {
s.log.Error().Err(err).Strs("eventIds", record.EventIds).Msg("cannot delete records")
return nil, err
}
return &userEvents{
User: record.User,
Events: res.GetEvents(),
}, nil
}

View File

@@ -71,7 +71,7 @@ func (s eventsNotifier) handleScienceMeshInviteTokenGenerated(e events.ScienceMe
msgENV,
)
if err != nil {
s.logger.Error().Err(err).Msg("building the message has failed")
logger.Error().Err(err).Msg("building the message has failed")
return
}

View File

@@ -4,6 +4,8 @@ import (
"context"
"errors"
"fmt"
ehsvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/eventhistory/v0"
"go-micro.dev/v4/store"
"net/url"
"os"
"os/signal"
@@ -50,7 +52,10 @@ func NewEventsNotifier(
logger log.Logger,
gatewaySelector pool.Selectable[gateway.GatewayAPIClient],
valueService settingssvc.ValueService,
serviceAccountID, serviceAccountSecret, emailTemplatePath, defaultLanguage, ocisURL, translationPath string) Service {
serviceAccountID, serviceAccountSecret, emailTemplatePath, defaultLanguage, ocisURL, translationPath, emailSender string,
store store.Store,
historyClient ehsvc.EventHistoryService,
registeredEvents map[string]events.Unmarshaller) Service {
return eventsNotifier{
logger: logger,
@@ -63,9 +68,13 @@ func NewEventsNotifier(
serviceAccountSecret: serviceAccountSecret,
emailTemplatePath: emailTemplatePath,
defaultLanguage: defaultLanguage,
defaultEmailSender: emailSender,
ocisURL: ocisURL,
translationPath: translationPath,
filter: newNotificationFilter(logger, valueService),
splitter: newIntervalSplitter(logger, valueService),
userEventStore: newUserEventStore(logger, store, historyClient),
registeredEvents: registeredEvents,
}
}
@@ -79,10 +88,14 @@ type eventsNotifier struct {
emailTemplatePath string
translationPath string
defaultLanguage string
defaultEmailSender string
ocisURL string
serviceAccountID string
serviceAccountSecret string
filter *notificationFilter
splitter *intervalSplitter
userEventStore *userEventStore
registeredEvents map[string]events.Unmarshaller
}
func (s eventsNotifier) Run() error {
@@ -95,15 +108,15 @@ func (s eventsNotifier) Run() error {
go func() {
switch e := evt.Event.(type) {
case events.SpaceShared:
s.handleSpaceShared(e)
s.handleSpaceShared(e, evt.ID)
case events.SpaceUnshared:
s.handleSpaceUnshared(e)
s.handleSpaceUnshared(e, evt.ID)
case events.SpaceMembershipExpired:
s.handleSpaceMembershipExpired(e)
s.handleSpaceMembershipExpired(e, evt.ID)
case events.ShareCreated:
s.handleShareCreated(e)
s.handleShareCreated(e, evt.ID)
case events.ShareExpired:
s.handleShareExpired(e)
s.handleShareExpired(e, evt.ID)
case events.ScienceMeshInviteTokenGenerated:
s.handleScienceMeshInviteTokenGenerated(e)
}
@@ -135,8 +148,8 @@ func (s eventsNotifier) render(ctx context.Context, template email.MessageTempla
return messageList, nil
}
func (s eventsNotifier) send(ctx context.Context, recipientList []*channels.Message) {
for _, r := range recipientList {
func (s eventsNotifier) send(ctx context.Context, emails []*channels.Message) {
for _, r := range emails {
err := s.channel.SendMessage(ctx, r)
if err != nil {
s.logger.Error().Err(err).Str("event", "SendEmail").Msg("failed to send a message")

View File

@@ -2,6 +2,7 @@ package service_test
import (
"context"
"github.com/cs3org/reva/v2/pkg/store"
settingsmsg "github.com/owncloud/ocis/v2/protogen/gen/ocis/messages/settings/v0"
"time"
@@ -88,12 +89,14 @@ var _ = Describe("Notifications", func() {
}
})
DescribeTable("Sending notifications",
DescribeTable("Sending userEventIds",
func(tc testChannel, ev events.Event) {
cfg := defaults.FullDefaultConfig()
cfg.GRPCClientTLS = &shared.GRPCClientTLS{}
ch := make(chan events.Event)
evts := service.NewEventsNotifier(ch, tc, log.NewLogger(), gatewaySelector, vs, "", "", "", "", "", "")
evts := service.NewEventsNotifier(ch, tc, log.NewLogger(), gatewaySelector, vs, "",
"", "", "", "", "", "",
store.Create(), nil, nil)
go evts.Run()
ch <- ev
@@ -301,12 +304,14 @@ var _ = Describe("Notifications X-Site Scripting", func() {
}
})
DescribeTable("Sending notifications",
DescribeTable("Sending userEventIds",
func(tc testChannel, ev events.Event) {
cfg := defaults.FullDefaultConfig()
cfg.GRPCClientTLS = &shared.GRPCClientTLS{}
ch := make(chan events.Event)
evts := service.NewEventsNotifier(ch, tc, log.NewLogger(), gatewaySelector, vs, "", "", "", "", "", "")
evts := service.NewEventsNotifier(ch, tc, log.NewLogger(), gatewaySelector, vs, "",
"", "", "", "", "", "",
store.Create(), nil, nil)
go evts.Run()
ch <- ev

View File

@@ -1,29 +1,63 @@
package service
import (
"context"
user "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
"github.com/cs3org/reva/v2/pkg/events"
"github.com/cs3org/reva/v2/pkg/utils"
"github.com/owncloud/ocis/v2/ocis-pkg/log"
"github.com/owncloud/ocis/v2/services/notifications/pkg/email"
"github.com/owncloud/ocis/v2/services/settings/pkg/store/defaults"
"google.golang.org/protobuf/types/known/fieldmaskpb"
)
func (s eventsNotifier) handleShareCreated(e events.ShareCreated) {
logger := s.logger.With().
func (s eventsNotifier) handleShareCreated(e events.ShareCreated, eventId string) {
logger := log.Logger{Logger: s.logger.With().
Str("event", "ShareCreated").
Str("itemid", e.ItemID.OpaqueId).
Logger()
Logger()}
owner, shareFolder, shareLink, ctx, err := s.prepareShareCreated(logger, e)
if err != nil {
logger.Error().Err(err).Msg("could not prepare vars for email")
return
}
granteeList := s.ensureGranteeList(ctx, owner.GetId(), e.GranteeUserID, e.GranteeGroupID)
filteredGrantees := s.filter.execute(ctx, granteeList, defaults.SettingUUIDProfileEventShareCreated)
recipientsInstant, recipientsDaily, recipientsInstantWeekly := s.splitter.execute(ctx, filteredGrantees)
recipientsInstant = append(recipientsInstant, s.userEventStore.persist(intervalDaily, eventId, recipientsDaily)...)
recipientsInstant = append(recipientsInstant, s.userEventStore.persist(intervalWeekly, eventId, recipientsInstantWeekly)...)
if recipientsInstant == nil {
return
}
sharerDisplayName := owner.GetDisplayName()
emails, err := s.render(ctx, email.ShareCreated,
"ShareGrantee",
map[string]string{
"ShareSharer": sharerDisplayName,
"ShareFolder": shareFolder,
"ShareLink": shareLink,
}, recipientsInstant, sharerDisplayName)
if err != nil {
logger.Error().Err(err).Msg("could not get render the email")
return
}
s.send(ctx, emails)
}
func (s eventsNotifier) prepareShareCreated(logger log.Logger, e events.ShareCreated) (owner *user.User, shareFolder, shareLink string, ctx context.Context, err error) {
gatewayClient, err := s.gatewaySelector.Next()
if err != nil {
logger.Error().Err(err).Msg("could not select next gateway client")
return
}
ctx, err := utils.GetServiceUserContext(s.serviceAccountID, gatewayClient, s.serviceAccountSecret)
ctx, err = utils.GetServiceUserContextWithContext(context.Background(), gatewayClient, s.serviceAccountID, s.serviceAccountSecret)
if err != nil {
logger.Error().Err(err).Msg("Could not impersonate service user")
return
logger.Error().Err(err).Msg("could not get service user context")
}
resourceInfo, err := s.getResourceInfo(ctx, e.ItemID, &fieldmaskpb.FieldMask{Paths: []string{"name"}})
@@ -33,8 +67,9 @@ func (s eventsNotifier) handleShareCreated(e events.ShareCreated) {
Msg("could not stat resource")
return
}
shareFolder = resourceInfo.Name
shareLink, err := urlJoinPath(s.ocisURL, "files/shares/with-me")
shareLink, err = urlJoinPath(s.ocisURL, "files/shares/with-me")
if err != nil {
logger.Error().
Err(err).
@@ -42,38 +77,22 @@ func (s eventsNotifier) handleShareCreated(e events.ShareCreated) {
return
}
owner, err := utils.GetUser(e.Sharer, gatewayClient)
owner, err = utils.GetUserWithContext(ctx, e.Sharer, gatewayClient)
if err != nil {
logger.Error().Err(err).Msg("Could not get user")
logger.Error().
Err(err).
Msg("could not get user")
return
}
granteeList := s.ensureGranteeList(ctx, owner.GetId(), e.GranteeUserID, e.GranteeGroupID)
filteredGrantees := s.filter.execute(ctx, granteeList, defaults.SettingUUIDProfileEventShareCreated)
if filteredGrantees == nil {
return
}
sharerDisplayName := owner.GetDisplayName()
recipientList, err := s.render(ctx, email.ShareCreated,
"ShareGrantee",
map[string]string{
"ShareSharer": sharerDisplayName,
"ShareFolder": resourceInfo.Name,
"ShareLink": shareLink,
}, filteredGrantees, sharerDisplayName)
if err != nil {
s.logger.Error().Err(err).Str("event", "ShareCreated").Msg("could not get render the email")
return
}
s.send(ctx, recipientList)
return
}
func (s eventsNotifier) handleShareExpired(e events.ShareExpired) {
logger := s.logger.With().
func (s eventsNotifier) handleShareExpired(e events.ShareExpired, eventId string) {
logger := log.Logger{Logger: s.logger.With().
Str("event", "ShareExpired").
Str("itemid", e.ItemID.GetOpaqueId()).
Logger()
Logger()}
gatewayClient, err := s.gatewaySelector.Next()
if err != nil {
@@ -81,12 +100,53 @@ func (s eventsNotifier) handleShareExpired(e events.ShareExpired) {
return
}
ctx, err := utils.GetServiceUserContext(s.serviceAccountID, gatewayClient, s.serviceAccountSecret)
shareFolder, ctx, err := s.prepareShareExpired(logger, e)
if err != nil {
logger.Error().Err(err).Msg("Could not impersonate sharer")
logger.Error().Err(err).Msg("could not prepare vars for email")
return
}
owner, err := utils.GetUserWithContext(ctx, e.ShareOwner, gatewayClient)
if err != nil {
logger.Error().Err(err).Msg("Could not get user")
return
}
granteeList := s.ensureGranteeList(ctx, owner.GetId(), e.GranteeUserID, e.GranteeGroupID)
filteredGrantees := s.filter.execute(ctx, granteeList, defaults.SettingUUIDProfileEventShareExpired)
recipientsInstant, recipientsDaily, recipientsInstantWeekly := s.splitter.execute(ctx, filteredGrantees)
recipientsInstant = append(recipientsInstant, s.userEventStore.persist(intervalDaily, eventId, recipientsDaily)...)
recipientsInstant = append(recipientsInstant, s.userEventStore.persist(intervalWeekly, eventId, recipientsInstantWeekly)...)
if recipientsInstant == nil {
return
}
emails, err := s.render(ctx, email.ShareExpired,
"ShareGrantee",
map[string]string{
"ShareFolder": shareFolder,
"ExpiredAt": e.ExpiredAt.Format("2006-01-02 15:04:05"),
}, recipientsInstant, owner.GetDisplayName())
if err != nil {
logger.Error().Err(err).Msg("could not get render the email")
return
}
s.send(ctx, emails)
}
func (s eventsNotifier) prepareShareExpired(logger log.Logger, e events.ShareExpired) (shareFolder string, ctx context.Context, err error) {
gatewayClient, err := s.gatewaySelector.Next()
if err != nil {
logger.Error().Err(err).Msg("could not select next gateway client")
return
}
ctx, err = utils.GetServiceUserContextWithContext(context.Background(), gatewayClient, s.serviceAccountID, s.serviceAccountSecret)
if err != nil {
logger.Error().Err(err).Msg("could not get service user context")
}
resourceInfo, err := s.getResourceInfo(ctx, e.ItemID, &fieldmaskpb.FieldMask{Paths: []string{"name"}})
if err != nil {
logger.Error().
@@ -94,28 +154,7 @@ func (s eventsNotifier) handleShareExpired(e events.ShareExpired) {
Msg("could not stat resource")
return
}
shareFolder = resourceInfo.GetName()
owner, err := utils.GetUser(e.ShareOwner, gatewayClient)
if err != nil {
logger.Error().Err(err).Msg("Could not get user")
return
}
granteeList := s.ensureGranteeList(ctx, owner.GetId(), e.GranteeUserID, e.GranteeGroupID)
filteredGrantees := s.filter.execute(ctx, granteeList, defaults.SettingUUIDProfileEventShareExpired)
if filteredGrantees == nil {
return
}
recipientList, err := s.render(ctx, email.ShareExpired,
"ShareGrantee",
map[string]string{
"ShareFolder": resourceInfo.GetName(),
"ExpiredAt": e.ExpiredAt.Format("2006-01-02 15:04:05"),
}, filteredGrantees, owner.GetDisplayName())
if err != nil {
s.logger.Error().Err(err).Str("event", "ShareExpired").Msg("could not get render the email")
return
}
s.send(ctx, recipientList)
return
}

View File

@@ -1,104 +1,69 @@
package service
import (
"context"
user "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
"github.com/cs3org/reva/v2/pkg/events"
"github.com/cs3org/reva/v2/pkg/storagespace"
"github.com/cs3org/reva/v2/pkg/utils"
"github.com/owncloud/ocis/v2/ocis-pkg/log"
"github.com/owncloud/ocis/v2/services/notifications/pkg/email"
"github.com/owncloud/ocis/v2/services/settings/pkg/store/defaults"
)
func (s eventsNotifier) handleSpaceShared(e events.SpaceShared) {
logger := s.logger.With().
func (s eventsNotifier) handleSpaceShared(e events.SpaceShared, eventId string) {
logger := log.Logger{Logger: s.logger.With().
Str("event", "SpaceShared").
Str("itemid", e.ID.OpaqueId).
Logger()
gatewayClient, err := s.gatewaySelector.Next()
Logger()}
executant, spaceName, shareLink, ctx, err := s.prepareSpaceShared(logger, e)
if err != nil {
logger.Error().Err(err).Msg("could not select next gateway client")
logger.Error().Err(err).Msg("could not prepare vars for email")
return
}
ctx, err := utils.GetServiceUserContext(s.serviceAccountID, gatewayClient, s.serviceAccountSecret)
if err != nil {
logger.Error().
Err(err).
Msg("could not handle space shared event")
return
}
resourceID, err := storagespace.ParseID(e.ID.OpaqueId)
if err != nil {
logger.Error().
Err(err).
Msg("could not parse resourceid from ItemID ")
return
}
resourceInfo, err := s.getResourceInfo(ctx, &resourceID, nil)
if err != nil {
logger.Error().
Err(err).
Msg("could not get space info")
return
}
shareLink, err := urlJoinPath(s.ocisURL, "f", e.ID.OpaqueId)
if err != nil {
logger.Error().
Err(err).
Msg("could not create link to the share")
return
}
executant, err := utils.GetUser(e.Executant, gatewayClient)
if err != nil {
logger.Error().
Err(err).
Msg("could not get user")
return
}
// Note: We're using the 'executantCtx' (authenticated as the share executant) here for requesting
// the Grantees of the shares. Ideally the notfication service would use some kind of service
// user for this.
granteeList := s.ensureGranteeList(ctx, executant.GetId(), e.GranteeUserID, e.GranteeGroupID)
filteredGrantees := s.filter.execute(ctx, granteeList, defaults.SettingUUIDProfileEventSpaceShared)
if filteredGrantees == nil {
recipientsInstant, recipientsDaily, recipientsInstantWeekly := s.splitter.execute(ctx, filteredGrantees)
recipientsInstant = append(recipientsInstant, s.userEventStore.persist(intervalDaily, eventId, recipientsDaily)...)
recipientsInstant = append(recipientsInstant, s.userEventStore.persist(intervalWeekly, eventId, recipientsInstantWeekly)...)
if recipientsInstant == nil {
return
}
sharerDisplayName := executant.GetDisplayName()
recipientList, err := s.render(ctx, email.SharedSpace,
emails, err := s.render(ctx, email.SharedSpace,
"SpaceGrantee",
map[string]string{
"SpaceSharer": sharerDisplayName,
"SpaceName": resourceInfo.GetSpace().GetName(),
"SpaceName": spaceName,
"ShareLink": shareLink,
}, filteredGrantees, sharerDisplayName)
}, recipientsInstant, sharerDisplayName)
if err != nil {
s.logger.Error().Err(err).Str("event", "SharedSpace").Msg("could not get render the email")
logger.Error().Err(err).Msg("could not get render the email")
return
}
s.send(ctx, recipientList)
s.send(ctx, emails)
}
func (s eventsNotifier) handleSpaceUnshared(e events.SpaceUnshared) {
logger := s.logger.With().
Str("event", "SpaceUnshared").
Str("itemid", e.ID.OpaqueId).
Logger()
func (s eventsNotifier) prepareSpaceShared(logger log.Logger, e events.SpaceShared) (executant *user.User, spaceName, shareLink string, ctx context.Context, err error) {
gatewayClient, err := s.gatewaySelector.Next()
if err != nil {
logger.Error().Err(err).Msg("could not select next gateway client")
return
}
ctx, err := utils.GetServiceUserContext(s.serviceAccountID, gatewayClient, s.serviceAccountSecret)
ctx, err = utils.GetServiceUserContextWithContext(context.Background(), gatewayClient, s.serviceAccountID, s.serviceAccountSecret)
if err != nil {
logger.Error().Err(err).Msg("could not handle space unshared event")
logger.Error().Err(err).Msg("could not get service user context")
}
executant, err = utils.GetUserWithContext(ctx, e.Executant, gatewayClient)
if err != nil {
logger.Error().
Err(err).
Msg("could not get user")
return
}
@@ -117,16 +82,68 @@ func (s eventsNotifier) handleSpaceUnshared(e events.SpaceUnshared) {
Msg("could not get space info")
return
}
spaceName = resourceInfo.GetSpace().GetName()
shareLink, err := urlJoinPath(s.ocisURL, "f", e.ID.OpaqueId)
shareLink, err = urlJoinPath(s.ocisURL, "f", e.ID.OpaqueId)
if err != nil {
logger.Error().
Err(err).
Msg("could not create link to the share")
return
}
return
}
executant, err := utils.GetUser(e.Executant, gatewayClient)
func (s eventsNotifier) handleSpaceUnshared(e events.SpaceUnshared, eventId string) {
logger := log.Logger{Logger: s.logger.With().
Str("event", "SpaceUnshared").
Str("itemid", e.ID.OpaqueId).
Logger()}
executant, spaceName, shareLink, ctx, err := s.prepareSpaceUnshared(logger, e)
if err != nil {
logger.Error().Err(err).Msg("could not prepare vars for email")
return
}
granteeList := s.ensureGranteeList(ctx, executant.GetId(), e.GranteeUserID, e.GranteeGroupID)
filteredGrantees := s.filter.execute(ctx, granteeList, defaults.SettingUUIDProfileEventSpaceUnshared)
recipientsInstant, recipientsDaily, recipientsInstantWeekly := s.splitter.execute(ctx, filteredGrantees)
recipientsInstant = append(recipientsInstant, s.userEventStore.persist(intervalDaily, eventId, recipientsDaily)...)
recipientsInstant = append(recipientsInstant, s.userEventStore.persist(intervalWeekly, eventId, recipientsInstantWeekly)...)
if recipientsInstant == nil {
return
}
sharerDisplayName := executant.GetDisplayName()
emails, err := s.render(ctx, email.UnsharedSpace,
"SpaceGrantee",
map[string]string{
"SpaceSharer": sharerDisplayName,
"SpaceName": spaceName,
"ShareLink": shareLink,
}, recipientsInstant, sharerDisplayName)
if err != nil {
logger.Error().Err(err).Msg("Could not get render the email")
return
}
s.send(ctx, emails)
}
func (s eventsNotifier) prepareSpaceUnshared(logger log.Logger, e events.SpaceUnshared) (executant *user.User, spaceName, shareLink string, ctx context.Context, err error) {
gatewayClient, err := s.gatewaySelector.Next()
if err != nil {
logger.Error().Err(err).Msg("could not select next gateway client")
return
}
ctx, err = utils.GetServiceUserContextWithContext(context.Background(), gatewayClient, s.serviceAccountID, s.serviceAccountSecret)
if err != nil {
logger.Error().Err(err).Msg("could not get service user context")
}
executant, err = utils.GetUserWithContext(ctx, e.Executant, gatewayClient)
if err != nil {
logger.Error().
Err(err).
@@ -134,31 +151,34 @@ func (s eventsNotifier) handleSpaceUnshared(e events.SpaceUnshared) {
return
}
// Note: We're using the 'executantCtx' (authenticated as the share executant) here for requesting
// the Grantees of the shares. Ideally the notfication service would use some kind of service
// user for this.
granteeList := s.ensureGranteeList(ctx, executant.GetId(), e.GranteeUserID, e.GranteeGroupID)
filteredGrantees := s.filter.execute(ctx, granteeList, defaults.SettingUUIDProfileEventSpaceUnshared)
if filteredGrantees == nil {
resourceID, err := storagespace.ParseID(e.ID.OpaqueId)
if err != nil {
logger.Error().
Err(err).
Msg("could not parse resourceid from ItemID ")
return
}
sharerDisplayName := executant.GetDisplayName()
recipientList, err := s.render(ctx, email.UnsharedSpace,
"SpaceGrantee",
map[string]string{
"SpaceSharer": sharerDisplayName,
"SpaceName": resourceInfo.GetSpace().Name,
"ShareLink": shareLink,
}, filteredGrantees, sharerDisplayName)
resourceInfo, err := s.getResourceInfo(ctx, &resourceID, nil)
if err != nil {
s.logger.Error().Err(err).Str("event", "UnsharedSpace").Msg("Could not get render the email")
logger.Error().
Err(err).
Msg("could not get space info")
return
}
s.send(ctx, recipientList)
spaceName = resourceInfo.GetSpace().GetName()
shareLink, err = urlJoinPath(s.ocisURL, "f", e.ID.OpaqueId)
if err != nil {
logger.Error().
Err(err).
Msg("could not create link to the share")
return
}
return
}
func (s eventsNotifier) handleSpaceMembershipExpired(e events.SpaceMembershipExpired) {
func (s eventsNotifier) handleSpaceMembershipExpired(e events.SpaceMembershipExpired, eventId string) {
logger := s.logger.With().
Str("event", "SpaceMembershipExpired").
Str("itemid", e.SpaceID.GetOpaqueId()).
@@ -189,19 +209,23 @@ func (s eventsNotifier) handleSpaceMembershipExpired(e events.SpaceMembershipExp
return
}
filteredGrantees := s.filter.execute(ctx, granteeList, defaults.SettingUUIDProfileEventSpaceMembershipExpired)
if filteredGrantees == nil {
recipientsInstant, recipientsDaily, recipientsInstantWeekly := s.splitter.execute(ctx, filteredGrantees)
recipientsInstant = append(recipientsInstant, s.userEventStore.persist(intervalDaily, eventId, recipientsDaily)...)
recipientsInstant = append(recipientsInstant, s.userEventStore.persist(intervalWeekly, eventId, recipientsInstantWeekly)...)
if recipientsInstant == nil {
return
}
recipientList, err := s.render(ctx, email.MembershipExpired,
emails, err := s.render(ctx, email.MembershipExpired,
"SpaceGrantee",
map[string]string{
"SpaceName": e.SpaceName,
"ExpiredAt": e.ExpiredAt.Format("2006-01-02 15:04:05"),
}, filteredGrantees, owner.GetDisplayName())
}, recipientsInstant, owner.GetDisplayName())
if err != nil {
s.logger.Error().Err(err).Str("event", "SpaceUnshared").Msg("could not get render the email")
logger.Error().Err(err).Msg("could not get render the email")
return
}
s.send(ctx, recipientList)
s.send(ctx, emails)
}

View File

@@ -0,0 +1,61 @@
package service
import (
"context"
user "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
"github.com/owncloud/ocis/v2/ocis-pkg/log"
"github.com/owncloud/ocis/v2/ocis-pkg/middleware"
settingssvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/settings/v0"
"github.com/owncloud/ocis/v2/services/settings/pkg/store/defaults"
"github.com/pkg/errors"
micrometadata "go-micro.dev/v4/metadata"
)
type intervalSplitter struct {
log log.Logger
valueClient settingssvc.ValueService
persistenceService userEventStore
}
func newIntervalSplitter(l log.Logger, vc settingssvc.ValueService) *intervalSplitter {
return &intervalSplitter{log: l, valueClient: vc}
}
// execute splits users into 3 lists depending on their email sending interval settings
func (s intervalSplitter) execute(ctx context.Context, users []*user.User) (instant, daily, weekly []*user.User) {
for _, u := range users {
userId := u.GetId().GetOpaqueId()
interval, err := getEmailSendingInterval(ctx, s.valueClient, userId)
if err != nil {
s.log.Error().Err(err).Str("userId", userId).Msg("cannot get user email sending interval")
instant = append(instant, u)
} else if interval == "instant" {
instant = append(instant, u)
} else if interval == intervalDaily {
daily = append(daily, u)
} else if interval == intervalWeekly {
weekly = append(weekly, u)
}
}
return
}
func getEmailSendingInterval(ctx context.Context, vc settingssvc.ValueService, userId string) (string, error) {
resp, err := vc.GetValueByUniqueIdentifiers(
micrometadata.Set(ctx, middleware.AccountID, userId),
&settingssvc.GetValueByUniqueIdentifiersRequest{
AccountUuid: userId,
SettingId: defaults.SettingUUIDProfileEmailSendingInterval,
},
)
if err != nil {
return "", err
}
val := resp.GetValue().GetValue().GetStringValue()
if val == "" {
return "", errors.New("email sending interval is empty")
}
return val, nil
}

View File

@@ -0,0 +1,196 @@
package service
import (
"context"
user "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
"github.com/owncloud/ocis/v2/ocis-pkg/log"
settingsmsg "github.com/owncloud/ocis/v2/protogen/gen/ocis/messages/settings/v0"
settings "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/settings/v0"
v0 "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/settings/v0"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"go-micro.dev/v4/client"
"strings"
"testing"
)
func Test_intervalSplitter_execute(t *testing.T) {
type fields struct {
log log.Logger
valueClient v0.ValueService
}
type args struct {
ctx context.Context
users []*user.User
settingId string
}
tests := []struct {
name string
fields fields
args args
wantInstant []*user.User
wantDaily []*user.User
wantWeekly []*user.User
}{
{"no connection to ValueService",
fields{
log: testLogger,
valueClient: settings.MockValueService{
GetValueByUniqueIdentifiersFunc: func(ctx context.Context, req *settings.GetValueByUniqueIdentifiersRequest, opts ...client.CallOption) (*settings.GetValueResponse, error) {
return nil, errors.New("no connection to ValueService")
}}}, args{
ctx: context.TODO(),
users: newUsers("foo"),
settingId: "",
},
newUsers("foo"), []*user.User(nil), []*user.User(nil),
},
{"no setting in ValueService response",
fields{
log: testLogger,
valueClient: settings.MockValueService{
GetValueByUniqueIdentifiersFunc: func(ctx context.Context, req *settings.GetValueByUniqueIdentifiersRequest, opts ...client.CallOption) (*settings.GetValueResponse, error) {
return &settings.GetValueResponse{}, nil
}}},
args{
ctx: context.TODO(),
users: newUsers("foo"),
settingId: "",
},
newUsers("foo"), []*user.User(nil), []*user.User(nil),
},
{"ValueService nil response",
fields{
log: testLogger,
valueClient: settings.MockValueService{
GetValueByUniqueIdentifiersFunc: func(ctx context.Context, req *settings.GetValueByUniqueIdentifiersRequest, opts ...client.CallOption) (*settings.GetValueResponse, error) {
return nil, nil
}}},
args{
ctx: context.TODO(),
users: newUsers("foo"),
settingId: "",
},
newUsers("foo"), []*user.User(nil), []*user.User(nil),
},
{"input users nil",
fields{
log: testLogger,
valueClient: settings.MockValueService{
GetValueByUniqueIdentifiersFunc: func(ctx context.Context, req *settings.GetValueByUniqueIdentifiersRequest, opts ...client.CallOption) (*settings.GetValueResponse, error) {
return nil, nil
}},
},
args{
ctx: context.TODO(),
users: nil,
},
[]*user.User(nil), []*user.User(nil), []*user.User(nil),
},
{"interval never",
fields{
log: testLogger,
valueClient: newStringValueMockValueService("never"),
},
args{
ctx: context.TODO(),
users: newUsers("foo"),
},
[]*user.User(nil), []*user.User(nil), []*user.User(nil),
},
{"interval instant",
fields{
log: testLogger,
valueClient: newStringValueMockValueService("instant"),
},
args{
ctx: context.TODO(),
users: newUsers("foo"),
},
newUsers("foo"), []*user.User(nil), []*user.User(nil),
},
{"interval daily",
fields{
log: testLogger,
valueClient: newStringValueMockValueService("daily"),
},
args{
ctx: context.TODO(),
users: newUsers("foo"),
},
[]*user.User(nil), newUsers("foo"), []*user.User(nil),
},
{"interval weekly",
fields{
log: testLogger,
valueClient: newStringValueMockValueService("weekly"),
},
args{
ctx: context.TODO(),
users: newUsers("foo"),
},
[]*user.User(nil), []*user.User(nil), newUsers("foo"),
},
{"multiple users and intervals",
fields{
log: testLogger,
valueClient: settings.MockValueService{
GetValueByUniqueIdentifiersFunc: func(ctx context.Context, req *settings.GetValueByUniqueIdentifiersRequest, opts ...client.CallOption) (*settings.GetValueResponse, error) {
if strings.Contains(req.AccountUuid, "never") {
return newGetValueResponseStringValue("never"), nil
} else if strings.Contains(req.AccountUuid, "instant") {
return newGetValueResponseStringValue("instant"), nil
} else if strings.Contains(req.AccountUuid, "daily") {
return newGetValueResponseStringValue("daily"), nil
} else if strings.Contains(req.AccountUuid, "weekly") {
return newGetValueResponseStringValue("weekly"), nil
}
return nil, nil
}},
},
args{
ctx: context.TODO(),
users: newUsers("never1", "instant1", "daily1", "weekly1", "never2", "instant2", "daily2", "weekly2"),
},
newUsers("instant1", "instant2"), newUsers("daily1", "daily2"), newUsers("weekly1", "weekly2"),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s := intervalSplitter{
log: tt.fields.log,
valueClient: tt.fields.valueClient,
}
gotInstant, gotDaily, gotWeekly := s.execute(tt.args.ctx, tt.args.users)
assert.Equalf(t, tt.wantInstant, gotInstant, "execute(%v, %v, %v)", tt.args.ctx, tt.args.users)
assert.Equalf(t, tt.wantDaily, gotDaily, "execute(%v, %v, %v)", tt.args.ctx, tt.args.users)
assert.Equalf(t, tt.wantWeekly, gotWeekly, "execute(%v, %v, %v)", tt.args.ctx, tt.args.users)
})
}
}
func newStringValueMockValueService(strVal string) settings.ValueService {
return settings.MockValueService{
GetValueByUniqueIdentifiersFunc: func(ctx context.Context, req *settings.GetValueByUniqueIdentifiersRequest, opts ...client.CallOption) (*settings.GetValueResponse, error) {
return newGetValueResponseStringValue(strVal), nil
},
}
}
func newGetValueResponseStringValue(strVal string) *settings.GetValueResponse {
return &settings.GetValueResponse{Value: &settingsmsg.ValueWithIdentifier{
Value: &settingsmsg.Value{
Value: &settingsmsg.Value_StringValue{
StringValue: strVal,
},
},
}}
}
func newUsers(ids ...string) []*user.User {
var users []*user.User
for _, s := range ids {
users = append(users, &user.User{Id: &user.UserId{OpaqueId: s}})
}
return users
}