Merge pull request #9467 from kobergj/ActivitylogFixes

Activitylog fixes
This commit is contained in:
kobergj
2024-06-27 11:13:46 +02:00
committed by GitHub
5 changed files with 71 additions and 10 deletions
@@ -0,0 +1,5 @@
Enhancement: Various fixes for the activitylog service
First round of fixes to make the activitylog service more robust and reliable.
https://github.com/owncloud/ocis/pull/9467
@@ -33,6 +33,7 @@ var _registeredEvents = []events.Unmarshaller{
events.FileTouched{},
events.ContainerCreated{},
events.ItemTrashed{},
events.ItemPurged{},
events.ItemMoved{},
events.ShareCreated{},
events.ShareRemoved{},
+28 -9
View File
@@ -6,6 +6,7 @@ import (
"errors"
"net/http"
"path/filepath"
"slices"
"strconv"
"strings"
"time"
@@ -17,6 +18,7 @@ import (
"github.com/cs3org/reva/v2/pkg/utils"
"google.golang.org/grpc/metadata"
libregraph "github.com/owncloud/libre-graph-api-go"
"github.com/owncloud/ocis/v2/ocis-pkg/ast"
"github.com/owncloud/ocis/v2/ocis-pkg/kql"
"github.com/owncloud/ocis/v2/ocis-pkg/l10n"
@@ -51,7 +53,7 @@ func (s *ActivitylogService) HandleGetItemActivities(w http.ResponseWriter, r *h
return
}
rid, limit, rawActivityAccepted, activityAccepted, err := s.getFilters(r.URL.Query().Get("kql"))
rid, limit, rawActivityAccepted, activityAccepted, sort, err := s.getFilters(r.URL.Query().Get("kql"))
if err != nil {
s.log.Info().Str("query", r.URL.Query().Get("kql")).Err(err).Msg("error getting filters")
_, _ = w.Write([]byte(err.Error()))
@@ -83,7 +85,7 @@ func (s *ActivitylogService) HandleGetItemActivities(w http.ResponseWriter, r *h
return
}
var resp GetActivitiesResponse
resp := GetActivitiesResponse{Activities: make([]libregraph.Activity, 0, len(evRes.GetEvents()))}
for _, e := range evRes.GetEvents() {
delete(toDelete, e.GetId())
@@ -179,6 +181,8 @@ func (s *ActivitylogService) HandleGetItemActivities(w http.ResponseWriter, r *h
}()
}
sort(resp.Activities)
b, err := json.Marshal(resp)
if err != nil {
s.log.Error().Err(err).Msg("error marshalling activities")
@@ -210,15 +214,17 @@ func (s *ActivitylogService) unwrapEvent(e *ehmsg.Event) interface{} {
return einterface
}
func (s *ActivitylogService) getFilters(query string) (*provider.ResourceId, int, func(RawActivity) bool, func(*ehmsg.Event) bool, error) {
func (s *ActivitylogService) getFilters(query string) (*provider.ResourceId, int, func(RawActivity) bool, func(*ehmsg.Event) bool, func([]libregraph.Activity), error) {
qast, err := kql.Builder{}.Build(query)
if err != nil {
return nil, 0, nil, nil, err
return nil, 0, nil, nil, nil, err
}
prefilters := make([]func(RawActivity) bool, 0)
postfilters := make([]func(*ehmsg.Event) bool, 0)
sortby := func(_ []libregraph.Activity) {}
var (
itemID string
limit int
@@ -233,7 +239,7 @@ func (s *ActivitylogService) getFilters(query string) (*provider.ResourceId, int
case "depth":
depth, err := strconv.Atoi(v.Value)
if err != nil {
return nil, limit, nil, nil, err
return nil, limit, nil, nil, sortby, err
}
prefilters = append(prefilters, func(a RawActivity) bool {
@@ -242,10 +248,19 @@ func (s *ActivitylogService) getFilters(query string) (*provider.ResourceId, int
case "limit":
l, err := strconv.Atoi(v.Value)
if err != nil {
return nil, limit, nil, nil, err
return nil, limit, nil, nil, sortby, err
}
limit = l
case "sort":
switch v.Value {
case "asc":
// nothing to do - already ascending
case "desc":
sortby = func(activities []libregraph.Activity) {
slices.Reverse(activities)
}
}
}
case *ast.DateTimeNode:
switch v.Operator.Value {
@@ -260,14 +275,18 @@ func (s *ActivitylogService) getFilters(query string) (*provider.ResourceId, int
}
case *ast.OperatorNode:
if v.Value != "AND" {
return nil, limit, nil, nil, errors.New("only AND operator is supported")
return nil, limit, nil, nil, sortby, errors.New("only AND operator is supported")
}
}
}
rid, err := storagespace.ParseID(itemID)
if err != nil {
return nil, limit, nil, nil, err
return nil, limit, nil, nil, sortby, err
}
if rid.GetOpaqueId() == "" {
// space root requested - fix format
rid.OpaqueId = rid.GetSpaceId()
}
pref := func(a RawActivity) bool {
for _, f := range prefilters {
@@ -285,7 +304,7 @@ func (s *ActivitylogService) getFilters(query string) (*provider.ResourceId, int
}
return true
}
return &rid, limit, pref, postf, nil
return &rid, limit, pref, postf, sortby, nil
}
// returns true if this is just a rename
+23 -1
View File
@@ -57,6 +57,9 @@ func WithResource(ref *provider.Reference, addSpace bool) ActivityOption {
return func(ctx context.Context, gwc gateway.GatewayAPIClient, vars map[string]interface{}) error {
info, err := utils.GetResource(ctx, ref, gwc)
if err != nil {
vars["resource"] = Resource{
Name: filepath.Base(ref.GetPath()),
}
return err
}
@@ -90,6 +93,10 @@ func WithOldResource(ref *provider.Reference) ActivityOption {
// WithTrashedResource sets the resource variable if the resource is trashed
func WithTrashedResource(ref *provider.Reference, rid *provider.ResourceId) ActivityOption {
return func(ctx context.Context, gwc gateway.GatewayAPIClient, vars map[string]interface{}) error {
vars["resource"] = Resource{
Name: filepath.Base(ref.GetPath()),
}
resp, err := gwc.ListRecycle(ctx, &provider.ListRecycleRequest{
Ref: ref,
})
@@ -122,6 +129,10 @@ func WithUser(uid *user.UserId, username string) ActivityOption {
if username == "" {
u, err := utils.GetUser(uid, gwc)
if err != nil {
vars["user"] = Actor{
ID: uid.GetOpaqueId(),
DisplayName: "DeletedUser",
}
return err
}
username = u.GetUsername()
@@ -143,6 +154,9 @@ func WithSharee(uid *user.UserId, gid *group.GroupId) ActivityOption {
case uid != nil:
u, err := utils.GetUser(uid, gwc)
if err != nil {
vars["sharee"] = Actor{
DisplayName: "DeletedUser",
}
return err
}
@@ -151,6 +165,10 @@ func WithSharee(uid *user.UserId, gid *group.GroupId) ActivityOption {
DisplayName: u.GetUsername(),
}
case gid != nil:
vars["sharee"] = Actor{
ID: gid.GetOpaqueId(),
DisplayName: "DeletedGroup",
}
r, err := gwc.GetGroup(ctx, &group.GetGroupRequest{GroupId: gid})
if err != nil {
return fmt.Errorf("error getting group: %w", err)
@@ -176,6 +194,10 @@ func WithSpace(spaceid *provider.StorageSpaceId) ActivityOption {
return func(ctx context.Context, gwc gateway.GatewayAPIClient, vars map[string]interface{}) error {
s, err := utils.GetSpace(ctx, spaceid.GetOpaqueId(), gwc)
if err != nil {
vars["space"] = Resource{
ID: spaceid.GetOpaqueId(),
Name: "DeletedSpace",
}
return err
}
vars["space"] = Resource{
@@ -209,7 +231,7 @@ func (s *ActivitylogService) GetVars(ctx context.Context, opts ...ActivityOption
vars := make(map[string]interface{})
for _, opt := range opts {
if err := opt(ctx, gwc, vars); err != nil {
return nil, err
s.log.Info().Err(err).Msg("error getting activity vars")
}
}
@@ -104,6 +104,8 @@ func (a *ActivitylogService) Run() {
err = a.AddActivity(ev.Ref, e.ID, utils.TSToTime(ev.Timestamp))
case events.ItemTrashed:
err = a.AddActivityTrashed(ev.ID, ev.Ref, e.ID, utils.TSToTime(ev.Timestamp))
case events.ItemPurged:
err = a.RemoveResource(ev.ID)
case events.ItemMoved:
err = a.AddActivity(ev.Ref, e.ID, utils.TSToTime(ev.Timestamp))
case events.ShareCreated:
@@ -221,6 +223,18 @@ func (a *ActivitylogService) RemoveActivities(rid *provider.ResourceId, toDelete
})
}
// RemoveResource removes the resource from the store
func (a *ActivitylogService) RemoveResource(rid *provider.ResourceId) error {
if rid == nil {
return fmt.Errorf("resource id is required")
}
a.lock.Lock()
defer a.lock.Unlock()
return a.store.Delete(storagespace.FormatResourceID(*rid))
}
func (a *ActivitylogService) activities(rid *provider.ResourceId) ([]RawActivity, error) {
resourceID := storagespace.FormatResourceID(*rid)