diff --git a/services/notifications/pkg/command/server.go b/services/notifications/pkg/command/server.go index 8aa1e1bec7..309bc603f0 100644 --- a/services/notifications/pkg/command/server.go +++ b/services/notifications/pkg/command/server.go @@ -3,6 +3,10 @@ package command import ( "context" "fmt" + "github.com/cs3org/reva/v2/pkg/store" + ehsvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/eventhistory/v0" + microstore "go-micro.dev/v4/store" + "reflect" "github.com/oklog/run" "github.com/urfave/cli/v2" @@ -81,7 +85,14 @@ func Server(cfg *config.Config) *cli.Command { events.SpaceUnshared{}, events.SpaceMembershipExpired{}, events.ScienceMeshInviteTokenGenerated{}, + events.SendEmailsEvent{}, } + registeredEvents := make(map[string]events.Unmarshaller) + for _, e := range evs { + typ := reflect.TypeOf(e) + registeredEvents[typ.String()] = e + } + client, err := stream.NatsFromConfig(cfg.Service.Name, false, stream.NatsConfig(cfg.Notifications.Events)) if err != nil { return err @@ -109,7 +120,21 @@ func Server(cfg *config.Config) *cli.Command { logger.Fatal().Err(err).Str("addr", cfg.Notifications.RevaGateway).Msg("could not get reva gateway selector") } valueService := settingssvc.NewValueService("com.owncloud.api.settings", grpcClient) - svc := service.NewEventsNotifier(evts, channel, logger, gatewaySelector, valueService, cfg.ServiceAccount.ServiceAccountID, cfg.ServiceAccount.ServiceAccountSecret, cfg.Notifications.EmailTemplatePath, cfg.Notifications.DefaultLanguage, cfg.WebUIURL, cfg.Notifications.TranslationPath) + historyClient := ehsvc.NewEventHistoryService("com.owncloud.api.eventhistory", grpcClient) + + notificationStore := store.Create( + store.Store(cfg.Store.Store), + store.TTL(cfg.Store.TTL), + microstore.Nodes(cfg.Store.Nodes...), + microstore.Database(cfg.Store.Database), + microstore.Table(cfg.Store.Table), + store.Authentication(cfg.Store.AuthUsername, cfg.Store.AuthPassword), + ) + + svc := service.NewEventsNotifier(evts, channel, logger, gatewaySelector, valueService, + cfg.ServiceAccount.ServiceAccountID, cfg.ServiceAccount.ServiceAccountSecret, + cfg.Notifications.EmailTemplatePath, cfg.Notifications.DefaultLanguage, cfg.WebUIURL, + cfg.Notifications.TranslationPath, cfg.Notifications.SMTP.Sender, notificationStore, historyClient, registeredEvents) gr.Add(svc.Run, func(error) { cancel() diff --git a/services/notifications/pkg/config/config.go b/services/notifications/pkg/config/config.go index 280a695470..9dd24773b2 100644 --- a/services/notifications/pkg/config/config.go +++ b/services/notifications/pkg/config/config.go @@ -3,6 +3,7 @@ package config import ( "context" + "time" "github.com/owncloud/ocis/v2/ocis-pkg/shared" ) @@ -24,6 +25,8 @@ type Config struct { ServiceAccount ServiceAccount `yaml:"service_account"` Context context.Context `yaml:"-"` + + Store Store `yaml:"store"` } // Notifications defines the config options for the notifications service. @@ -65,3 +68,20 @@ type ServiceAccount struct { ServiceAccountID string `yaml:"service_account_id" env:"OCIS_SERVICE_ACCOUNT_ID;NOTIFICATIONS_SERVICE_ACCOUNT_ID" desc:"The ID of the service account the service should use. See the 'auth-service' service description for more details." introductionVersion:"5.0"` ServiceAccountSecret string `yaml:"service_account_secret" env:"OCIS_SERVICE_ACCOUNT_SECRET;NOTIFICATIONS_SERVICE_ACCOUNT_SECRET" desc:"The service account secret." introductionVersion:"5.0"` } + +// TODO: +// - README +// - DOCS +// - introductionVersion correct? +// - is TTL mandatory? + +// Store configures the store to use +type Store struct { + Store string `yaml:"store" env:"OCIS_PERSISTENT_STORE;NOTIFICATIONS_STORE" desc:"The type of the store. Supported values are: 'memory', 'nats-js-kv', 'redis-sentinel', 'noop'. See the text description for details." introductionVersion:"7.1"` + Nodes []string `yaml:"nodes" env:"OCIS_PERSISTENT_STORE_NODES;NOTIFICATIONS_STORE_NODES" desc:"A list of nodes to access the configured store. This has no effect when 'memory' store is configured. Note that the behaviour how nodes are used is dependent on the library of the configured store. See the Environment Variable Types description for more details." introductionVersion:"7.1"` + Database string `yaml:"database" env:"NOTIFICATIONS_STORE_DATABASE" desc:"The database name the configured store should use." introductionVersion:"7.1"` + Table string `yaml:"table" env:"NOTIFICATIONS_STORE_TABLE" desc:"The database table the store should use." introductionVersion:"7.1"` + TTL time.Duration `yaml:"ttl" env:"OCIS_PERSISTENT_STORE_TTL;NOTIFICATIONS_STORE_TTL" desc:"Time to live for notifications in the store. Defaults to '336h' (2 weeks). See the Environment Variable Types description for more details." introductionVersion:"7.1"` + AuthUsername string `yaml:"username" env:"OCIS_PERSISTENT_STORE_AUTH_USERNAME;NOTIFICATIONS_STORE_AUTH_USERNAME" desc:"The username to authenticate with the store. Only applies when store type 'nats-js-kv' is configured." introductionVersion:"7.1"` + AuthPassword string `yaml:"password" env:"OCIS_PERSISTENT_STORE_AUTH_PASSWORD;NOTIFICATIONS_STORE_AUTH_PASSWORD" desc:"The password to authenticate with the store. Only applies when store type 'nats-js-kv' is configured." introductionVersion:"7.1"` +} diff --git a/services/notifications/pkg/config/defaults/defaultconfig.go b/services/notifications/pkg/config/defaults/defaultconfig.go index 3a5ff26192..69b94c91d7 100644 --- a/services/notifications/pkg/config/defaults/defaultconfig.go +++ b/services/notifications/pkg/config/defaults/defaultconfig.go @@ -4,6 +4,7 @@ import ( "github.com/owncloud/ocis/v2/ocis-pkg/shared" "github.com/owncloud/ocis/v2/ocis-pkg/structs" "github.com/owncloud/ocis/v2/services/notifications/pkg/config" + "time" ) // FullDefaultConfig returns a fully initialized default configuration @@ -40,6 +41,13 @@ func DefaultConfig() *config.Config { }, RevaGateway: shared.DefaultRevaConfig().Address, }, + Store: config.Store{ + Store: "nats-js-kv", + Nodes: []string{"127.0.0.1:9233"}, + Database: "notifications", + Table: "", + TTL: 336 * time.Hour, + }, } } diff --git a/services/notifications/pkg/service/persistence.go b/services/notifications/pkg/service/persistence.go new file mode 100644 index 0000000000..b08deefdf8 --- /dev/null +++ b/services/notifications/pkg/service/persistence.go @@ -0,0 +1,116 @@ +package service + +import ( + "context" + "encoding/json" + user "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" + "github.com/owncloud/ocis/v2/ocis-pkg/log" + v0 "github.com/owncloud/ocis/v2/protogen/gen/ocis/messages/eventhistory/v0" + ehsvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/eventhistory/v0" + "github.com/pkg/errors" + "go-micro.dev/v4/store" +) + +type userEventStore struct { + log log.Logger + store store.Store + historyClient ehsvc.EventHistoryService +} + +type userEventIds struct { + User *user.User `json:"user"` + EventIds []string `json:"event_ids"` +} + +type userEvents struct { + User *user.User + Events []*v0.Event +} + +const ( + intervalDaily = "daily" + intervalWeekly = "weekly" +) + +func newUserEventStore(l log.Logger, s store.Store, hc ehsvc.EventHistoryService) *userEventStore { + return &userEventStore{log: l, store: s, historyClient: hc} +} + +func (s *userEventStore) persist(interval string, eventId string, users []*user.User) []*user.User { + var errorUsers []*user.User + for _, u := range users { + key := interval + u.Id.OpaqueId + + // Note: This is not thread safe and can result in missing events + records, err := s.store.Read(key) + if err != nil && err != store.ErrNotFound { + s.log.Error().Err(err).Str("eventId", eventId).Str("userId", u.Id.OpaqueId).Msg("cannot read record") + errorUsers = append(errorUsers, u) + continue + } + var record userEventIds + if len(records) == 0 { + record = userEventIds{} + } else { + if err = json.Unmarshal(records[0].Value, &record); err != nil { + s.log.Warn().Err(err).Str("eventId", eventId).Str("userId", u.Id.OpaqueId).Msg("cannot unmarshal json") + errorUsers = append(errorUsers, u) + continue + } + } + record.User = u + record.EventIds = append(record.EventIds, eventId) + b, err := json.Marshal(record) + if err != nil { + s.log.Warn().Err(err).Str("eventId", eventId).Str("userId", u.Id.OpaqueId).Msg("cannot marshal record") + errorUsers = append(errorUsers, u) + continue + } + err = s.store.Write(&store.Record{ + Key: key, + Value: b, + }) + if err != nil { + s.log.Error().Err(err).Str("eventId", eventId).Str("userId", u.Id.OpaqueId).Msg("cannot write record") + errorUsers = append(errorUsers, u) + continue + } + + } + return errorUsers +} + +func (s *userEventStore) listKeys(interval string) ([]string, error) { + return s.store.List(store.ListPrefix(interval)) +} + +func (s *userEventStore) pop(ctx context.Context, key string) (*userEvents, error) { + records, err := s.store.Read(key) + if err != nil && err != store.ErrNotFound { + return nil, errors.New("cannot get records") + } + if len(records) == 0 { + return nil, errors.New("no records found") + } + var record userEventIds + err = json.Unmarshal(records[0].Value, &record) + if err != nil { + s.log.Warn().Err(err).Str("key", key).Msg("cannot unmarshal json") + return nil, err + } + + res, err := s.historyClient.GetEvents(ctx, &ehsvc.GetEventsRequest{Ids: record.EventIds}) + if err != nil { + s.log.Error().Err(err).Strs("eventIds", record.EventIds).Msg("cannot get events") + return nil, err + } + err = s.store.Delete(key) + if err != nil { + s.log.Error().Err(err).Strs("eventIds", record.EventIds).Msg("cannot delete records") + return nil, err + } + return &userEvents{ + User: record.User, + Events: res.GetEvents(), + }, nil +} diff --git a/services/notifications/pkg/service/sciencemesh.go b/services/notifications/pkg/service/sciencemesh.go index 98d01f71a6..a1c260414a 100644 --- a/services/notifications/pkg/service/sciencemesh.go +++ b/services/notifications/pkg/service/sciencemesh.go @@ -71,7 +71,7 @@ func (s eventsNotifier) handleScienceMeshInviteTokenGenerated(e events.ScienceMe msgENV, ) if err != nil { - s.logger.Error().Err(err).Msg("building the message has failed") + logger.Error().Err(err).Msg("building the message has failed") return } diff --git a/services/notifications/pkg/service/service.go b/services/notifications/pkg/service/service.go index 4016249e9a..0f3bac70e9 100644 --- a/services/notifications/pkg/service/service.go +++ b/services/notifications/pkg/service/service.go @@ -4,6 +4,8 @@ import ( "context" "errors" "fmt" + ehsvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/eventhistory/v0" + "go-micro.dev/v4/store" "net/url" "os" "os/signal" @@ -50,7 +52,10 @@ func NewEventsNotifier( logger log.Logger, gatewaySelector pool.Selectable[gateway.GatewayAPIClient], valueService settingssvc.ValueService, - serviceAccountID, serviceAccountSecret, emailTemplatePath, defaultLanguage, ocisURL, translationPath string) Service { + serviceAccountID, serviceAccountSecret, emailTemplatePath, defaultLanguage, ocisURL, translationPath, emailSender string, + store store.Store, + historyClient ehsvc.EventHistoryService, + registeredEvents map[string]events.Unmarshaller) Service { return eventsNotifier{ logger: logger, @@ -63,9 +68,13 @@ func NewEventsNotifier( serviceAccountSecret: serviceAccountSecret, emailTemplatePath: emailTemplatePath, defaultLanguage: defaultLanguage, + defaultEmailSender: emailSender, ocisURL: ocisURL, translationPath: translationPath, filter: newNotificationFilter(logger, valueService), + splitter: newIntervalSplitter(logger, valueService), + userEventStore: newUserEventStore(logger, store, historyClient), + registeredEvents: registeredEvents, } } @@ -79,10 +88,14 @@ type eventsNotifier struct { emailTemplatePath string translationPath string defaultLanguage string + defaultEmailSender string ocisURL string serviceAccountID string serviceAccountSecret string filter *notificationFilter + splitter *intervalSplitter + userEventStore *userEventStore + registeredEvents map[string]events.Unmarshaller } func (s eventsNotifier) Run() error { @@ -95,15 +108,15 @@ func (s eventsNotifier) Run() error { go func() { switch e := evt.Event.(type) { case events.SpaceShared: - s.handleSpaceShared(e) + s.handleSpaceShared(e, evt.ID) case events.SpaceUnshared: - s.handleSpaceUnshared(e) + s.handleSpaceUnshared(e, evt.ID) case events.SpaceMembershipExpired: - s.handleSpaceMembershipExpired(e) + s.handleSpaceMembershipExpired(e, evt.ID) case events.ShareCreated: - s.handleShareCreated(e) + s.handleShareCreated(e, evt.ID) case events.ShareExpired: - s.handleShareExpired(e) + s.handleShareExpired(e, evt.ID) case events.ScienceMeshInviteTokenGenerated: s.handleScienceMeshInviteTokenGenerated(e) } @@ -135,8 +148,8 @@ func (s eventsNotifier) render(ctx context.Context, template email.MessageTempla return messageList, nil } -func (s eventsNotifier) send(ctx context.Context, recipientList []*channels.Message) { - for _, r := range recipientList { +func (s eventsNotifier) send(ctx context.Context, emails []*channels.Message) { + for _, r := range emails { err := s.channel.SendMessage(ctx, r) if err != nil { s.logger.Error().Err(err).Str("event", "SendEmail").Msg("failed to send a message") diff --git a/services/notifications/pkg/service/service_test.go b/services/notifications/pkg/service/service_test.go index e2ab94e132..9ba58acb4a 100644 --- a/services/notifications/pkg/service/service_test.go +++ b/services/notifications/pkg/service/service_test.go @@ -2,6 +2,7 @@ package service_test import ( "context" + "github.com/cs3org/reva/v2/pkg/store" settingsmsg "github.com/owncloud/ocis/v2/protogen/gen/ocis/messages/settings/v0" "time" @@ -88,12 +89,14 @@ var _ = Describe("Notifications", func() { } }) - DescribeTable("Sending notifications", + DescribeTable("Sending userEventIds", func(tc testChannel, ev events.Event) { cfg := defaults.FullDefaultConfig() cfg.GRPCClientTLS = &shared.GRPCClientTLS{} ch := make(chan events.Event) - evts := service.NewEventsNotifier(ch, tc, log.NewLogger(), gatewaySelector, vs, "", "", "", "", "", "") + evts := service.NewEventsNotifier(ch, tc, log.NewLogger(), gatewaySelector, vs, "", + "", "", "", "", "", "", + store.Create(), nil, nil) go evts.Run() ch <- ev @@ -301,12 +304,14 @@ var _ = Describe("Notifications X-Site Scripting", func() { } }) - DescribeTable("Sending notifications", + DescribeTable("Sending userEventIds", func(tc testChannel, ev events.Event) { cfg := defaults.FullDefaultConfig() cfg.GRPCClientTLS = &shared.GRPCClientTLS{} ch := make(chan events.Event) - evts := service.NewEventsNotifier(ch, tc, log.NewLogger(), gatewaySelector, vs, "", "", "", "", "", "") + evts := service.NewEventsNotifier(ch, tc, log.NewLogger(), gatewaySelector, vs, "", + "", "", "", "", "", "", + store.Create(), nil, nil) go evts.Run() ch <- ev diff --git a/services/notifications/pkg/service/shares.go b/services/notifications/pkg/service/shares.go index 1413b3e049..58aee3b601 100644 --- a/services/notifications/pkg/service/shares.go +++ b/services/notifications/pkg/service/shares.go @@ -1,29 +1,63 @@ package service import ( + "context" + user "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" "github.com/cs3org/reva/v2/pkg/events" "github.com/cs3org/reva/v2/pkg/utils" + "github.com/owncloud/ocis/v2/ocis-pkg/log" "github.com/owncloud/ocis/v2/services/notifications/pkg/email" "github.com/owncloud/ocis/v2/services/settings/pkg/store/defaults" "google.golang.org/protobuf/types/known/fieldmaskpb" ) -func (s eventsNotifier) handleShareCreated(e events.ShareCreated) { - logger := s.logger.With(). +func (s eventsNotifier) handleShareCreated(e events.ShareCreated, eventId string) { + logger := log.Logger{Logger: s.logger.With(). Str("event", "ShareCreated"). Str("itemid", e.ItemID.OpaqueId). - Logger() + Logger()} + owner, shareFolder, shareLink, ctx, err := s.prepareShareCreated(logger, e) + if err != nil { + logger.Error().Err(err).Msg("could not prepare vars for email") + return + } + + granteeList := s.ensureGranteeList(ctx, owner.GetId(), e.GranteeUserID, e.GranteeGroupID) + filteredGrantees := s.filter.execute(ctx, granteeList, defaults.SettingUUIDProfileEventShareCreated) + + recipientsInstant, recipientsDaily, recipientsInstantWeekly := s.splitter.execute(ctx, filteredGrantees) + recipientsInstant = append(recipientsInstant, s.userEventStore.persist(intervalDaily, eventId, recipientsDaily)...) + recipientsInstant = append(recipientsInstant, s.userEventStore.persist(intervalWeekly, eventId, recipientsInstantWeekly)...) + if recipientsInstant == nil { + return + } + + sharerDisplayName := owner.GetDisplayName() + emails, err := s.render(ctx, email.ShareCreated, + "ShareGrantee", + map[string]string{ + "ShareSharer": sharerDisplayName, + "ShareFolder": shareFolder, + "ShareLink": shareLink, + }, recipientsInstant, sharerDisplayName) + if err != nil { + logger.Error().Err(err).Msg("could not get render the email") + return + } + s.send(ctx, emails) +} + +func (s eventsNotifier) prepareShareCreated(logger log.Logger, e events.ShareCreated) (owner *user.User, shareFolder, shareLink string, ctx context.Context, err error) { gatewayClient, err := s.gatewaySelector.Next() if err != nil { logger.Error().Err(err).Msg("could not select next gateway client") return } - ctx, err := utils.GetServiceUserContext(s.serviceAccountID, gatewayClient, s.serviceAccountSecret) + ctx, err = utils.GetServiceUserContextWithContext(context.Background(), gatewayClient, s.serviceAccountID, s.serviceAccountSecret) if err != nil { - logger.Error().Err(err).Msg("Could not impersonate service user") - return + logger.Error().Err(err).Msg("could not get service user context") } resourceInfo, err := s.getResourceInfo(ctx, e.ItemID, &fieldmaskpb.FieldMask{Paths: []string{"name"}}) @@ -33,8 +67,9 @@ func (s eventsNotifier) handleShareCreated(e events.ShareCreated) { Msg("could not stat resource") return } + shareFolder = resourceInfo.Name - shareLink, err := urlJoinPath(s.ocisURL, "files/shares/with-me") + shareLink, err = urlJoinPath(s.ocisURL, "files/shares/with-me") if err != nil { logger.Error(). Err(err). @@ -42,38 +77,22 @@ func (s eventsNotifier) handleShareCreated(e events.ShareCreated) { return } - owner, err := utils.GetUser(e.Sharer, gatewayClient) + owner, err = utils.GetUserWithContext(ctx, e.Sharer, gatewayClient) if err != nil { - logger.Error().Err(err).Msg("Could not get user") + logger.Error(). + Err(err). + Msg("could not get user") return } - granteeList := s.ensureGranteeList(ctx, owner.GetId(), e.GranteeUserID, e.GranteeGroupID) - filteredGrantees := s.filter.execute(ctx, granteeList, defaults.SettingUUIDProfileEventShareCreated) - if filteredGrantees == nil { - return - } - - sharerDisplayName := owner.GetDisplayName() - recipientList, err := s.render(ctx, email.ShareCreated, - "ShareGrantee", - map[string]string{ - "ShareSharer": sharerDisplayName, - "ShareFolder": resourceInfo.Name, - "ShareLink": shareLink, - }, filteredGrantees, sharerDisplayName) - if err != nil { - s.logger.Error().Err(err).Str("event", "ShareCreated").Msg("could not get render the email") - return - } - s.send(ctx, recipientList) + return } -func (s eventsNotifier) handleShareExpired(e events.ShareExpired) { - logger := s.logger.With(). +func (s eventsNotifier) handleShareExpired(e events.ShareExpired, eventId string) { + logger := log.Logger{Logger: s.logger.With(). Str("event", "ShareExpired"). Str("itemid", e.ItemID.GetOpaqueId()). - Logger() + Logger()} gatewayClient, err := s.gatewaySelector.Next() if err != nil { @@ -81,12 +100,53 @@ func (s eventsNotifier) handleShareExpired(e events.ShareExpired) { return } - ctx, err := utils.GetServiceUserContext(s.serviceAccountID, gatewayClient, s.serviceAccountSecret) + shareFolder, ctx, err := s.prepareShareExpired(logger, e) if err != nil { - logger.Error().Err(err).Msg("Could not impersonate sharer") + logger.Error().Err(err).Msg("could not prepare vars for email") return } + owner, err := utils.GetUserWithContext(ctx, e.ShareOwner, gatewayClient) + if err != nil { + logger.Error().Err(err).Msg("Could not get user") + return + } + + granteeList := s.ensureGranteeList(ctx, owner.GetId(), e.GranteeUserID, e.GranteeGroupID) + filteredGrantees := s.filter.execute(ctx, granteeList, defaults.SettingUUIDProfileEventShareExpired) + + recipientsInstant, recipientsDaily, recipientsInstantWeekly := s.splitter.execute(ctx, filteredGrantees) + recipientsInstant = append(recipientsInstant, s.userEventStore.persist(intervalDaily, eventId, recipientsDaily)...) + recipientsInstant = append(recipientsInstant, s.userEventStore.persist(intervalWeekly, eventId, recipientsInstantWeekly)...) + if recipientsInstant == nil { + return + } + + emails, err := s.render(ctx, email.ShareExpired, + "ShareGrantee", + map[string]string{ + "ShareFolder": shareFolder, + "ExpiredAt": e.ExpiredAt.Format("2006-01-02 15:04:05"), + }, recipientsInstant, owner.GetDisplayName()) + if err != nil { + logger.Error().Err(err).Msg("could not get render the email") + return + } + s.send(ctx, emails) +} + +func (s eventsNotifier) prepareShareExpired(logger log.Logger, e events.ShareExpired) (shareFolder string, ctx context.Context, err error) { + gatewayClient, err := s.gatewaySelector.Next() + if err != nil { + logger.Error().Err(err).Msg("could not select next gateway client") + return + } + + ctx, err = utils.GetServiceUserContextWithContext(context.Background(), gatewayClient, s.serviceAccountID, s.serviceAccountSecret) + if err != nil { + logger.Error().Err(err).Msg("could not get service user context") + } + resourceInfo, err := s.getResourceInfo(ctx, e.ItemID, &fieldmaskpb.FieldMask{Paths: []string{"name"}}) if err != nil { logger.Error(). @@ -94,28 +154,7 @@ func (s eventsNotifier) handleShareExpired(e events.ShareExpired) { Msg("could not stat resource") return } + shareFolder = resourceInfo.GetName() - owner, err := utils.GetUser(e.ShareOwner, gatewayClient) - if err != nil { - logger.Error().Err(err).Msg("Could not get user") - return - } - - granteeList := s.ensureGranteeList(ctx, owner.GetId(), e.GranteeUserID, e.GranteeGroupID) - filteredGrantees := s.filter.execute(ctx, granteeList, defaults.SettingUUIDProfileEventShareExpired) - if filteredGrantees == nil { - return - } - - recipientList, err := s.render(ctx, email.ShareExpired, - "ShareGrantee", - map[string]string{ - "ShareFolder": resourceInfo.GetName(), - "ExpiredAt": e.ExpiredAt.Format("2006-01-02 15:04:05"), - }, filteredGrantees, owner.GetDisplayName()) - if err != nil { - s.logger.Error().Err(err).Str("event", "ShareExpired").Msg("could not get render the email") - return - } - s.send(ctx, recipientList) + return } diff --git a/services/notifications/pkg/service/spaces.go b/services/notifications/pkg/service/spaces.go index 5f1b4c827c..91ac5f204a 100644 --- a/services/notifications/pkg/service/spaces.go +++ b/services/notifications/pkg/service/spaces.go @@ -1,104 +1,69 @@ package service import ( + "context" + user "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" "github.com/cs3org/reva/v2/pkg/events" "github.com/cs3org/reva/v2/pkg/storagespace" "github.com/cs3org/reva/v2/pkg/utils" + "github.com/owncloud/ocis/v2/ocis-pkg/log" "github.com/owncloud/ocis/v2/services/notifications/pkg/email" "github.com/owncloud/ocis/v2/services/settings/pkg/store/defaults" ) -func (s eventsNotifier) handleSpaceShared(e events.SpaceShared) { - logger := s.logger.With(). +func (s eventsNotifier) handleSpaceShared(e events.SpaceShared, eventId string) { + logger := log.Logger{Logger: s.logger.With(). Str("event", "SpaceShared"). Str("itemid", e.ID.OpaqueId). - Logger() - - gatewayClient, err := s.gatewaySelector.Next() + Logger()} + executant, spaceName, shareLink, ctx, err := s.prepareSpaceShared(logger, e) if err != nil { - logger.Error().Err(err).Msg("could not select next gateway client") + logger.Error().Err(err).Msg("could not prepare vars for email") return } - ctx, err := utils.GetServiceUserContext(s.serviceAccountID, gatewayClient, s.serviceAccountSecret) - if err != nil { - logger.Error(). - Err(err). - Msg("could not handle space shared event") - return - } - - resourceID, err := storagespace.ParseID(e.ID.OpaqueId) - if err != nil { - logger.Error(). - Err(err). - Msg("could not parse resourceid from ItemID ") - return - } - - resourceInfo, err := s.getResourceInfo(ctx, &resourceID, nil) - if err != nil { - logger.Error(). - Err(err). - Msg("could not get space info") - return - } - - shareLink, err := urlJoinPath(s.ocisURL, "f", e.ID.OpaqueId) - if err != nil { - logger.Error(). - Err(err). - Msg("could not create link to the share") - return - } - - executant, err := utils.GetUser(e.Executant, gatewayClient) - if err != nil { - logger.Error(). - Err(err). - Msg("could not get user") - return - } - - // Note: We're using the 'executantCtx' (authenticated as the share executant) here for requesting - // the Grantees of the shares. Ideally the notfication service would use some kind of service - // user for this. granteeList := s.ensureGranteeList(ctx, executant.GetId(), e.GranteeUserID, e.GranteeGroupID) filteredGrantees := s.filter.execute(ctx, granteeList, defaults.SettingUUIDProfileEventSpaceShared) - if filteredGrantees == nil { + + recipientsInstant, recipientsDaily, recipientsInstantWeekly := s.splitter.execute(ctx, filteredGrantees) + recipientsInstant = append(recipientsInstant, s.userEventStore.persist(intervalDaily, eventId, recipientsDaily)...) + recipientsInstant = append(recipientsInstant, s.userEventStore.persist(intervalWeekly, eventId, recipientsInstantWeekly)...) + if recipientsInstant == nil { return } sharerDisplayName := executant.GetDisplayName() - recipientList, err := s.render(ctx, email.SharedSpace, + emails, err := s.render(ctx, email.SharedSpace, "SpaceGrantee", map[string]string{ "SpaceSharer": sharerDisplayName, - "SpaceName": resourceInfo.GetSpace().GetName(), + "SpaceName": spaceName, "ShareLink": shareLink, - }, filteredGrantees, sharerDisplayName) + }, recipientsInstant, sharerDisplayName) if err != nil { - s.logger.Error().Err(err).Str("event", "SharedSpace").Msg("could not get render the email") + logger.Error().Err(err).Msg("could not get render the email") return } - s.send(ctx, recipientList) + s.send(ctx, emails) } -func (s eventsNotifier) handleSpaceUnshared(e events.SpaceUnshared) { - logger := s.logger.With(). - Str("event", "SpaceUnshared"). - Str("itemid", e.ID.OpaqueId). - Logger() - +func (s eventsNotifier) prepareSpaceShared(logger log.Logger, e events.SpaceShared) (executant *user.User, spaceName, shareLink string, ctx context.Context, err error) { gatewayClient, err := s.gatewaySelector.Next() if err != nil { logger.Error().Err(err).Msg("could not select next gateway client") return } - ctx, err := utils.GetServiceUserContext(s.serviceAccountID, gatewayClient, s.serviceAccountSecret) + ctx, err = utils.GetServiceUserContextWithContext(context.Background(), gatewayClient, s.serviceAccountID, s.serviceAccountSecret) if err != nil { - logger.Error().Err(err).Msg("could not handle space unshared event") + logger.Error().Err(err).Msg("could not get service user context") + } + + executant, err = utils.GetUserWithContext(ctx, e.Executant, gatewayClient) + if err != nil { + logger.Error(). + Err(err). + Msg("could not get user") return } @@ -117,16 +82,68 @@ func (s eventsNotifier) handleSpaceUnshared(e events.SpaceUnshared) { Msg("could not get space info") return } + spaceName = resourceInfo.GetSpace().GetName() - shareLink, err := urlJoinPath(s.ocisURL, "f", e.ID.OpaqueId) + shareLink, err = urlJoinPath(s.ocisURL, "f", e.ID.OpaqueId) if err != nil { logger.Error(). Err(err). Msg("could not create link to the share") return } + return +} - executant, err := utils.GetUser(e.Executant, gatewayClient) +func (s eventsNotifier) handleSpaceUnshared(e events.SpaceUnshared, eventId string) { + logger := log.Logger{Logger: s.logger.With(). + Str("event", "SpaceUnshared"). + Str("itemid", e.ID.OpaqueId). + Logger()} + + executant, spaceName, shareLink, ctx, err := s.prepareSpaceUnshared(logger, e) + if err != nil { + logger.Error().Err(err).Msg("could not prepare vars for email") + return + } + + granteeList := s.ensureGranteeList(ctx, executant.GetId(), e.GranteeUserID, e.GranteeGroupID) + filteredGrantees := s.filter.execute(ctx, granteeList, defaults.SettingUUIDProfileEventSpaceUnshared) + + recipientsInstant, recipientsDaily, recipientsInstantWeekly := s.splitter.execute(ctx, filteredGrantees) + recipientsInstant = append(recipientsInstant, s.userEventStore.persist(intervalDaily, eventId, recipientsDaily)...) + recipientsInstant = append(recipientsInstant, s.userEventStore.persist(intervalWeekly, eventId, recipientsInstantWeekly)...) + if recipientsInstant == nil { + return + } + + sharerDisplayName := executant.GetDisplayName() + emails, err := s.render(ctx, email.UnsharedSpace, + "SpaceGrantee", + map[string]string{ + "SpaceSharer": sharerDisplayName, + "SpaceName": spaceName, + "ShareLink": shareLink, + }, recipientsInstant, sharerDisplayName) + if err != nil { + logger.Error().Err(err).Msg("Could not get render the email") + return + } + s.send(ctx, emails) +} + +func (s eventsNotifier) prepareSpaceUnshared(logger log.Logger, e events.SpaceUnshared) (executant *user.User, spaceName, shareLink string, ctx context.Context, err error) { + gatewayClient, err := s.gatewaySelector.Next() + if err != nil { + logger.Error().Err(err).Msg("could not select next gateway client") + return + } + + ctx, err = utils.GetServiceUserContextWithContext(context.Background(), gatewayClient, s.serviceAccountID, s.serviceAccountSecret) + if err != nil { + logger.Error().Err(err).Msg("could not get service user context") + } + + executant, err = utils.GetUserWithContext(ctx, e.Executant, gatewayClient) if err != nil { logger.Error(). Err(err). @@ -134,31 +151,34 @@ func (s eventsNotifier) handleSpaceUnshared(e events.SpaceUnshared) { return } - // Note: We're using the 'executantCtx' (authenticated as the share executant) here for requesting - // the Grantees of the shares. Ideally the notfication service would use some kind of service - // user for this. - granteeList := s.ensureGranteeList(ctx, executant.GetId(), e.GranteeUserID, e.GranteeGroupID) - filteredGrantees := s.filter.execute(ctx, granteeList, defaults.SettingUUIDProfileEventSpaceUnshared) - if filteredGrantees == nil { + resourceID, err := storagespace.ParseID(e.ID.OpaqueId) + if err != nil { + logger.Error(). + Err(err). + Msg("could not parse resourceid from ItemID ") return } - sharerDisplayName := executant.GetDisplayName() - recipientList, err := s.render(ctx, email.UnsharedSpace, - "SpaceGrantee", - map[string]string{ - "SpaceSharer": sharerDisplayName, - "SpaceName": resourceInfo.GetSpace().Name, - "ShareLink": shareLink, - }, filteredGrantees, sharerDisplayName) + resourceInfo, err := s.getResourceInfo(ctx, &resourceID, nil) if err != nil { - s.logger.Error().Err(err).Str("event", "UnsharedSpace").Msg("Could not get render the email") + logger.Error(). + Err(err). + Msg("could not get space info") return } - s.send(ctx, recipientList) + spaceName = resourceInfo.GetSpace().GetName() + + shareLink, err = urlJoinPath(s.ocisURL, "f", e.ID.OpaqueId) + if err != nil { + logger.Error(). + Err(err). + Msg("could not create link to the share") + return + } + return } -func (s eventsNotifier) handleSpaceMembershipExpired(e events.SpaceMembershipExpired) { +func (s eventsNotifier) handleSpaceMembershipExpired(e events.SpaceMembershipExpired, eventId string) { logger := s.logger.With(). Str("event", "SpaceMembershipExpired"). Str("itemid", e.SpaceID.GetOpaqueId()). @@ -189,19 +209,23 @@ func (s eventsNotifier) handleSpaceMembershipExpired(e events.SpaceMembershipExp return } filteredGrantees := s.filter.execute(ctx, granteeList, defaults.SettingUUIDProfileEventSpaceMembershipExpired) - if filteredGrantees == nil { + + recipientsInstant, recipientsDaily, recipientsInstantWeekly := s.splitter.execute(ctx, filteredGrantees) + recipientsInstant = append(recipientsInstant, s.userEventStore.persist(intervalDaily, eventId, recipientsDaily)...) + recipientsInstant = append(recipientsInstant, s.userEventStore.persist(intervalWeekly, eventId, recipientsInstantWeekly)...) + if recipientsInstant == nil { return } - recipientList, err := s.render(ctx, email.MembershipExpired, + emails, err := s.render(ctx, email.MembershipExpired, "SpaceGrantee", map[string]string{ "SpaceName": e.SpaceName, "ExpiredAt": e.ExpiredAt.Format("2006-01-02 15:04:05"), - }, filteredGrantees, owner.GetDisplayName()) + }, recipientsInstant, owner.GetDisplayName()) if err != nil { - s.logger.Error().Err(err).Str("event", "SpaceUnshared").Msg("could not get render the email") + logger.Error().Err(err).Msg("could not get render the email") return } - s.send(ctx, recipientList) + s.send(ctx, emails) } diff --git a/services/notifications/pkg/service/splitter.go b/services/notifications/pkg/service/splitter.go new file mode 100644 index 0000000000..1991223223 --- /dev/null +++ b/services/notifications/pkg/service/splitter.go @@ -0,0 +1,61 @@ +package service + +import ( + "context" + user "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" + "github.com/owncloud/ocis/v2/ocis-pkg/log" + "github.com/owncloud/ocis/v2/ocis-pkg/middleware" + settingssvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/settings/v0" + "github.com/owncloud/ocis/v2/services/settings/pkg/store/defaults" + "github.com/pkg/errors" + micrometadata "go-micro.dev/v4/metadata" +) + +type intervalSplitter struct { + log log.Logger + valueClient settingssvc.ValueService + persistenceService userEventStore +} + +func newIntervalSplitter(l log.Logger, vc settingssvc.ValueService) *intervalSplitter { + return &intervalSplitter{log: l, valueClient: vc} +} + +// execute splits users into 3 lists depending on their email sending interval settings +func (s intervalSplitter) execute(ctx context.Context, users []*user.User) (instant, daily, weekly []*user.User) { + for _, u := range users { + userId := u.GetId().GetOpaqueId() + interval, err := getEmailSendingInterval(ctx, s.valueClient, userId) + if err != nil { + s.log.Error().Err(err).Str("userId", userId).Msg("cannot get user email sending interval") + instant = append(instant, u) + } else if interval == "instant" { + instant = append(instant, u) + } else if interval == intervalDaily { + daily = append(daily, u) + } else if interval == intervalWeekly { + weekly = append(weekly, u) + } + } + return +} + +func getEmailSendingInterval(ctx context.Context, vc settingssvc.ValueService, userId string) (string, error) { + resp, err := vc.GetValueByUniqueIdentifiers( + micrometadata.Set(ctx, middleware.AccountID, userId), + &settingssvc.GetValueByUniqueIdentifiersRequest{ + AccountUuid: userId, + SettingId: defaults.SettingUUIDProfileEmailSendingInterval, + }, + ) + + if err != nil { + return "", err + } + + val := resp.GetValue().GetValue().GetStringValue() + if val == "" { + return "", errors.New("email sending interval is empty") + } + return val, nil +} diff --git a/services/notifications/pkg/service/splitter_test.go b/services/notifications/pkg/service/splitter_test.go new file mode 100644 index 0000000000..14832121fd --- /dev/null +++ b/services/notifications/pkg/service/splitter_test.go @@ -0,0 +1,196 @@ +package service + +import ( + "context" + user "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" + "github.com/owncloud/ocis/v2/ocis-pkg/log" + settingsmsg "github.com/owncloud/ocis/v2/protogen/gen/ocis/messages/settings/v0" + settings "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/settings/v0" + v0 "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/settings/v0" + "github.com/pkg/errors" + "github.com/stretchr/testify/assert" + "go-micro.dev/v4/client" + "strings" + "testing" +) + +func Test_intervalSplitter_execute(t *testing.T) { + type fields struct { + log log.Logger + valueClient v0.ValueService + } + type args struct { + ctx context.Context + users []*user.User + settingId string + } + tests := []struct { + name string + fields fields + args args + wantInstant []*user.User + wantDaily []*user.User + wantWeekly []*user.User + }{ + {"no connection to ValueService", + fields{ + log: testLogger, + valueClient: settings.MockValueService{ + GetValueByUniqueIdentifiersFunc: func(ctx context.Context, req *settings.GetValueByUniqueIdentifiersRequest, opts ...client.CallOption) (*settings.GetValueResponse, error) { + return nil, errors.New("no connection to ValueService") + }}}, args{ + ctx: context.TODO(), + users: newUsers("foo"), + settingId: "", + }, + newUsers("foo"), []*user.User(nil), []*user.User(nil), + }, + {"no setting in ValueService response", + fields{ + log: testLogger, + valueClient: settings.MockValueService{ + GetValueByUniqueIdentifiersFunc: func(ctx context.Context, req *settings.GetValueByUniqueIdentifiersRequest, opts ...client.CallOption) (*settings.GetValueResponse, error) { + return &settings.GetValueResponse{}, nil + }}}, + args{ + ctx: context.TODO(), + users: newUsers("foo"), + settingId: "", + }, + newUsers("foo"), []*user.User(nil), []*user.User(nil), + }, + {"ValueService nil response", + fields{ + log: testLogger, + valueClient: settings.MockValueService{ + GetValueByUniqueIdentifiersFunc: func(ctx context.Context, req *settings.GetValueByUniqueIdentifiersRequest, opts ...client.CallOption) (*settings.GetValueResponse, error) { + return nil, nil + }}}, + args{ + ctx: context.TODO(), + users: newUsers("foo"), + settingId: "", + }, + newUsers("foo"), []*user.User(nil), []*user.User(nil), + }, + {"input users nil", + fields{ + log: testLogger, + valueClient: settings.MockValueService{ + GetValueByUniqueIdentifiersFunc: func(ctx context.Context, req *settings.GetValueByUniqueIdentifiersRequest, opts ...client.CallOption) (*settings.GetValueResponse, error) { + return nil, nil + }}, + }, + args{ + ctx: context.TODO(), + users: nil, + }, + []*user.User(nil), []*user.User(nil), []*user.User(nil), + }, + {"interval never", + fields{ + log: testLogger, + valueClient: newStringValueMockValueService("never"), + }, + args{ + ctx: context.TODO(), + users: newUsers("foo"), + }, + []*user.User(nil), []*user.User(nil), []*user.User(nil), + }, + {"interval instant", + fields{ + log: testLogger, + valueClient: newStringValueMockValueService("instant"), + }, + args{ + ctx: context.TODO(), + users: newUsers("foo"), + }, + newUsers("foo"), []*user.User(nil), []*user.User(nil), + }, + {"interval daily", + fields{ + log: testLogger, + valueClient: newStringValueMockValueService("daily"), + }, + args{ + ctx: context.TODO(), + users: newUsers("foo"), + }, + []*user.User(nil), newUsers("foo"), []*user.User(nil), + }, + {"interval weekly", + fields{ + log: testLogger, + valueClient: newStringValueMockValueService("weekly"), + }, + args{ + ctx: context.TODO(), + users: newUsers("foo"), + }, + []*user.User(nil), []*user.User(nil), newUsers("foo"), + }, + {"multiple users and intervals", + fields{ + log: testLogger, + valueClient: settings.MockValueService{ + GetValueByUniqueIdentifiersFunc: func(ctx context.Context, req *settings.GetValueByUniqueIdentifiersRequest, opts ...client.CallOption) (*settings.GetValueResponse, error) { + if strings.Contains(req.AccountUuid, "never") { + return newGetValueResponseStringValue("never"), nil + } else if strings.Contains(req.AccountUuid, "instant") { + return newGetValueResponseStringValue("instant"), nil + } else if strings.Contains(req.AccountUuid, "daily") { + return newGetValueResponseStringValue("daily"), nil + } else if strings.Contains(req.AccountUuid, "weekly") { + return newGetValueResponseStringValue("weekly"), nil + } + return nil, nil + }}, + }, + args{ + ctx: context.TODO(), + users: newUsers("never1", "instant1", "daily1", "weekly1", "never2", "instant2", "daily2", "weekly2"), + }, + newUsers("instant1", "instant2"), newUsers("daily1", "daily2"), newUsers("weekly1", "weekly2"), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s := intervalSplitter{ + log: tt.fields.log, + valueClient: tt.fields.valueClient, + } + gotInstant, gotDaily, gotWeekly := s.execute(tt.args.ctx, tt.args.users) + assert.Equalf(t, tt.wantInstant, gotInstant, "execute(%v, %v, %v)", tt.args.ctx, tt.args.users) + assert.Equalf(t, tt.wantDaily, gotDaily, "execute(%v, %v, %v)", tt.args.ctx, tt.args.users) + assert.Equalf(t, tt.wantWeekly, gotWeekly, "execute(%v, %v, %v)", tt.args.ctx, tt.args.users) + }) + } +} + +func newStringValueMockValueService(strVal string) settings.ValueService { + return settings.MockValueService{ + GetValueByUniqueIdentifiersFunc: func(ctx context.Context, req *settings.GetValueByUniqueIdentifiersRequest, opts ...client.CallOption) (*settings.GetValueResponse, error) { + return newGetValueResponseStringValue(strVal), nil + }, + } +} + +func newGetValueResponseStringValue(strVal string) *settings.GetValueResponse { + return &settings.GetValueResponse{Value: &settingsmsg.ValueWithIdentifier{ + Value: &settingsmsg.Value{ + Value: &settingsmsg.Value_StringValue{ + StringValue: strVal, + }, + }, + }} +} + +func newUsers(ids ...string) []*user.User { + var users []*user.User + for _, s := range ids { + users = append(users, &user.User{Id: &user.UserId{OpaqueId: s}}) + } + return users +}