mirror of
https://github.com/opencloud-eu/opencloud.git
synced 2026-02-21 13:09:10 -06:00
Merge pull request #7325 from kobergj/SEEImprovements
Improve SSE Notifications
This commit is contained in:
5
changelog/unreleased/improve-sses.md
Normal file
5
changelog/unreleased/improve-sses.md
Normal file
@@ -0,0 +1,5 @@
|
||||
Enhancement: Improve SSE format
|
||||
|
||||
Improve format of sse notifications
|
||||
|
||||
https://github.com/owncloud/ocis/pull/7325
|
||||
6
services/clientlog/pkg/service/events.go
Normal file
6
services/clientlog/pkg/service/events.go
Normal file
@@ -0,0 +1,6 @@
|
||||
package service
|
||||
|
||||
// FileReadyEvent is emitted when the postprocessing of a file is finished
|
||||
type FileReadyEvent struct {
|
||||
ItemID string `json:"itemid"`
|
||||
}
|
||||
@@ -17,12 +17,6 @@ import (
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
)
|
||||
|
||||
// ClientNotification is the event the clientlog service is sending to the client
|
||||
type ClientNotification struct {
|
||||
Type string
|
||||
ItemID string
|
||||
}
|
||||
|
||||
// ClientlogService is the service responsible for user activities
|
||||
type ClientlogService struct {
|
||||
log log.Logger
|
||||
@@ -93,8 +87,9 @@ func (cl *ClientlogService) processEvent(event events.Event) {
|
||||
}
|
||||
|
||||
var (
|
||||
users []string
|
||||
noti ClientNotification
|
||||
users []string
|
||||
evType string
|
||||
data interface{}
|
||||
)
|
||||
switch e := event.Event.(type) {
|
||||
default:
|
||||
@@ -106,8 +101,10 @@ func (cl *ClientlogService) processEvent(event events.Event) {
|
||||
return
|
||||
}
|
||||
|
||||
noti.Type = "postprocessing-finished"
|
||||
noti.ItemID = storagespace.FormatResourceID(*info.GetId())
|
||||
evType = "postprocessing-finished"
|
||||
data = FileReadyEvent{
|
||||
ItemID: storagespace.FormatResourceID(*info.GetId()),
|
||||
}
|
||||
|
||||
users, err = utils.GetSpaceMembers(ctx, info.GetSpace().GetId().GetOpaqueId(), gwc, utils.ViewerRole)
|
||||
}
|
||||
@@ -119,22 +116,22 @@ func (cl *ClientlogService) processEvent(event events.Event) {
|
||||
|
||||
// II) instruct sse service to send the information
|
||||
for _, id := range users {
|
||||
if err := cl.sendSSE(id, noti); err != nil {
|
||||
if err := cl.sendSSE(id, evType, data); err != nil {
|
||||
cl.log.Error().Err(err).Str("userID", id).Str("eventid", event.ID).Msg("failed to store event for user")
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (cl *ClientlogService) sendSSE(userid string, noti ClientNotification) error {
|
||||
b, err := json.Marshal(noti)
|
||||
func (cl *ClientlogService) sendSSE(userid string, evType string, data interface{}) error {
|
||||
b, err := json.Marshal(data)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return events.Publish(context.Background(), cl.publisher, events.SendSSE{
|
||||
UserID: userid,
|
||||
Type: "clientlog-notification",
|
||||
Type: evType,
|
||||
Message: b,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
|
||||
revactx "github.com/cs3org/reva/v2/pkg/ctx"
|
||||
@@ -12,6 +13,12 @@ import (
|
||||
"github.com/owncloud/ocis/v2/services/sse/pkg/config"
|
||||
)
|
||||
|
||||
// ServerSentEvent is the data structure sent by the sse service
|
||||
type ServerSentEvent struct {
|
||||
Type string `json:"type"`
|
||||
Data json.RawMessage `json:"data"`
|
||||
}
|
||||
|
||||
// SSE defines implements the business logic for Service.
|
||||
type SSE struct {
|
||||
c *config.Config
|
||||
@@ -45,20 +52,25 @@ func (s SSE) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
|
||||
// ListenForEvents listens for events
|
||||
func (s SSE) ListenForEvents() error {
|
||||
func (s SSE) ListenForEvents() {
|
||||
for e := range s.evChannel {
|
||||
switch ev := e.Event.(type) {
|
||||
default:
|
||||
s.l.Error().Interface("event", ev).Msg("unhandled event")
|
||||
case events.SendSSE:
|
||||
b, err := json.Marshal(ServerSentEvent{
|
||||
Type: ev.Type,
|
||||
Data: ev.Message,
|
||||
})
|
||||
if err != nil {
|
||||
s.l.Error().Interface("event", ev).Msg("cannot marshal event")
|
||||
continue
|
||||
}
|
||||
s.sse.Publish(ev.UserID, &sse.Event{
|
||||
Event: []byte(ev.Type),
|
||||
Data: ev.Message,
|
||||
Data: b,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// HandleSSE is the GET handler for events
|
||||
|
||||
Reference in New Issue
Block a user