mirror of
https://github.com/opencloud-eu/opencloud.git
synced 2026-01-05 19:59:37 -06:00
feat(activitylog): add api
Signed-off-by: jkoberg <jkoberg@owncloud.com>
This commit is contained in:
@@ -2,4 +2,5 @@ Enhancement: Activitylog Service
|
||||
|
||||
Adds a new service `activitylog` which stores events (activities) per resource. This data can be retrieved by clients to show item activities
|
||||
|
||||
https://github.com/owncloud/ocis/pull/9360
|
||||
https://github.com/owncloud/ocis/pull/9327
|
||||
|
||||
@@ -14,13 +14,15 @@ import (
|
||||
"github.com/owncloud/ocis/v2/ocis-pkg/handlers"
|
||||
"github.com/owncloud/ocis/v2/ocis-pkg/registry"
|
||||
"github.com/owncloud/ocis/v2/ocis-pkg/service/debug"
|
||||
ogrpc "github.com/owncloud/ocis/v2/ocis-pkg/service/grpc"
|
||||
"github.com/owncloud/ocis/v2/ocis-pkg/tracing"
|
||||
"github.com/owncloud/ocis/v2/ocis-pkg/version"
|
||||
ehsvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/eventhistory/v0"
|
||||
"github.com/owncloud/ocis/v2/services/activitylog/pkg/config"
|
||||
"github.com/owncloud/ocis/v2/services/activitylog/pkg/config/parser"
|
||||
"github.com/owncloud/ocis/v2/services/activitylog/pkg/logging"
|
||||
"github.com/owncloud/ocis/v2/services/activitylog/pkg/metrics"
|
||||
"github.com/owncloud/ocis/v2/services/activitylog/pkg/service"
|
||||
"github.com/owncloud/ocis/v2/services/activitylog/pkg/server/http"
|
||||
"github.com/urfave/cli/v2"
|
||||
microstore "go-micro.dev/v4/store"
|
||||
)
|
||||
@@ -109,15 +111,27 @@ func Server(cfg *config.Config) *cli.Command {
|
||||
return fmt.Errorf("could not get reva client selector: %s", err)
|
||||
}
|
||||
|
||||
grpcClient, err := ogrpc.NewClient(
|
||||
append(ogrpc.GetClientOptions(cfg.GRPCClientTLS), ogrpc.WithTraceProvider(tracerProvider))...,
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
hClient := ehsvc.NewEventHistoryService("com.owncloud.api.eventhistory", grpcClient)
|
||||
|
||||
{
|
||||
svc, err := service.New(
|
||||
service.Logger(logger),
|
||||
service.Config(cfg),
|
||||
service.TraceProvider(tracerProvider),
|
||||
service.Stream(evStream),
|
||||
service.RegisteredEvents(_registeredEvents),
|
||||
service.Store(evStore),
|
||||
service.GatewaySelector(gatewaySelector),
|
||||
svc, err := http.Server(
|
||||
http.Logger(logger),
|
||||
http.Config(cfg),
|
||||
http.Context(ctx), // NOTE: not passing this "option" leads to a panic in go-micro
|
||||
http.TraceProvider(tracerProvider),
|
||||
http.Stream(evStream),
|
||||
http.RegisteredEvents(_registeredEvents),
|
||||
http.Store(evStore),
|
||||
http.GatewaySelector(gatewaySelector),
|
||||
http.History(hClient),
|
||||
http.RegisteredEvents(_registeredEvents),
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
|
||||
@@ -23,6 +23,9 @@ type Config struct {
|
||||
RevaGateway string `yaml:"reva_gateway" env:"OCIS_REVA_GATEWAY" desc:"CS3 gateway used to look up user metadata" introductionVersion:"5.0"`
|
||||
GRPCClientTLS *shared.GRPCClientTLS `yaml:"grpc_client_tls"`
|
||||
|
||||
HTTP HTTP `yaml:"http"`
|
||||
TokenManager *TokenManager `yaml:"token_manager"`
|
||||
|
||||
ServiceAccount ServiceAccount `yaml:"service_account"`
|
||||
|
||||
Context context.Context `yaml:"-"`
|
||||
@@ -56,3 +59,25 @@ type ServiceAccount struct {
|
||||
ServiceAccountID string `yaml:"service_account_id" env:"OCIS_SERVICE_ACCOUNT_ID;ACTIVITYLOG_SERVICE_ACCOUNT_ID" desc:"The ID of the service account the service should use. See the 'auth-service' service description for more details." introductionVersion:"5.0"`
|
||||
ServiceAccountSecret string `yaml:"service_account_secret" env:"OCIS_SERVICE_ACCOUNT_SECRET;ACTIVITYOG_SERVICE_ACCOUNT_SECRET" desc:"The service account secret." introductionVersion:"5.0"`
|
||||
}
|
||||
|
||||
// CORS defines the available cors configuration.
|
||||
type CORS struct {
|
||||
AllowedOrigins []string `yaml:"allow_origins" env:"OCIS_CORS_ALLOW_ORIGINS;ACTIVITYLOG_CORS_ALLOW_ORIGINS" desc:"A list of allowed CORS origins. See following chapter for more details: *Access-Control-Allow-Origin* at https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Access-Control-Allow-Origin. See the Environment Variable Types description for more details." introductionVersion:"pre5.0"`
|
||||
AllowedMethods []string `yaml:"allow_methods" env:"OCIS_CORS_ALLOW_METHODS;ACTIVITYLOG_CORS_ALLOW_METHODS" desc:"A list of allowed CORS methods. See following chapter for more details: *Access-Control-Request-Method* at https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Access-Control-Request-Method. See the Environment Variable Types description for more details." introductionVersion:"pre5.0"`
|
||||
AllowedHeaders []string `yaml:"allow_headers" env:"OCIS_CORS_ALLOW_HEADERS;ACTIVITYLOG_CORS_ALLOW_HEADERS" desc:"A list of allowed CORS headers. See following chapter for more details: *Access-Control-Request-Headers* at https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Access-Control-Request-Headers. See the Environment Variable Types description for more details." introductionVersion:"pre5.0"`
|
||||
AllowCredentials bool `yaml:"allow_credentials" env:"OCIS_CORS_ALLOW_CREDENTIALS;ACTIVITYLOG_CORS_ALLOW_CREDENTIALS" desc:"Allow credentials for CORS.See following chapter for more details: *Access-Control-Allow-Credentials* at https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Access-Control-Allow-Credentials." introductionVersion:"pre5.0"`
|
||||
}
|
||||
|
||||
// HTTP defines the available http configuration.
|
||||
type HTTP struct {
|
||||
Addr string `yaml:"addr" env:"ACTIVITYLOG_HTTP_ADDR" desc:"The bind address of the HTTP service." introductionVersion:"pre5.0"`
|
||||
Namespace string `yaml:"-"`
|
||||
Root string `yaml:"root" env:"ACTIVITYLOG_HTTP_ROOT" desc:"Subdirectory that serves as the root for this HTTP service." introductionVersion:"pre5.0"`
|
||||
CORS CORS `yaml:"cors"`
|
||||
TLS shared.HTTPServiceTLS `yaml:"tls"`
|
||||
}
|
||||
|
||||
// TokenManager is the config for using the reva token manager
|
||||
type TokenManager struct {
|
||||
JWTSecret string `yaml:"jwt_secret" env:"OCIS_JWT_SECRET;ACTIVITYLOG_JWT_SECRET" desc:"The secret to mint and validate jwt tokens." introductionVersion:"pre5.0"`
|
||||
}
|
||||
|
||||
@@ -38,6 +38,17 @@ func DefaultConfig() *config.Config {
|
||||
Table: "",
|
||||
},
|
||||
RevaGateway: shared.DefaultRevaConfig().Address,
|
||||
HTTP: config.HTTP{
|
||||
Addr: "127.0.0.1:0",
|
||||
Root: "/",
|
||||
Namespace: "com.owncloud.web",
|
||||
CORS: config.CORS{
|
||||
AllowedOrigins: []string{"*"},
|
||||
AllowedMethods: []string{"GET"},
|
||||
AllowedHeaders: []string{"Authorization", "Origin", "Content-Type", "Accept", "X-Requested-With", "X-Request-Id", "Ocs-Apirequest"},
|
||||
AllowCredentials: true,
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
@@ -55,6 +66,22 @@ func EnsureDefaults(cfg *config.Config) {
|
||||
cfg.Log = &config.Log{}
|
||||
}
|
||||
|
||||
if cfg.GRPCClientTLS == nil && cfg.Commons != nil {
|
||||
cfg.GRPCClientTLS = structs.CopyOrZeroValue(cfg.Commons.GRPCClientTLS)
|
||||
}
|
||||
|
||||
if cfg.TokenManager == nil && cfg.Commons != nil && cfg.Commons.TokenManager != nil {
|
||||
cfg.TokenManager = &config.TokenManager{
|
||||
JWTSecret: cfg.Commons.TokenManager.JWTSecret,
|
||||
}
|
||||
} else if cfg.TokenManager == nil {
|
||||
cfg.TokenManager = &config.TokenManager{}
|
||||
}
|
||||
|
||||
if cfg.Commons != nil {
|
||||
cfg.HTTP.TLS = cfg.Commons.HTTPServiceTLS
|
||||
}
|
||||
|
||||
// provide with defaults for shared tracing, since we need a valid destination address for "envdecode".
|
||||
if cfg.Tracing == nil && cfg.Commons != nil && cfg.Commons.Tracing != nil {
|
||||
cfg.Tracing = &config.Tracing{
|
||||
@@ -67,9 +94,6 @@ func EnsureDefaults(cfg *config.Config) {
|
||||
cfg.Tracing = &config.Tracing{}
|
||||
}
|
||||
|
||||
if cfg.GRPCClientTLS == nil && cfg.Commons != nil {
|
||||
cfg.GRPCClientTLS = structs.CopyOrZeroValue(cfg.Commons.GRPCClientTLS)
|
||||
}
|
||||
}
|
||||
|
||||
// Sanitize sanitizes the config
|
||||
|
||||
130
services/activitylog/pkg/server/http/option.go
Normal file
130
services/activitylog/pkg/server/http/option.go
Normal file
@@ -0,0 +1,130 @@
|
||||
package http
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1"
|
||||
"github.com/cs3org/reva/v2/pkg/events"
|
||||
"github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool"
|
||||
"github.com/owncloud/ocis/v2/ocis-pkg/log"
|
||||
ehsvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/eventhistory/v0"
|
||||
"github.com/owncloud/ocis/v2/services/activitylog/pkg/config"
|
||||
"github.com/owncloud/ocis/v2/services/activitylog/pkg/metrics"
|
||||
"github.com/urfave/cli/v2"
|
||||
"go-micro.dev/v4/store"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
)
|
||||
|
||||
// Option defines a single option function.
|
||||
type Option func(o *Options)
|
||||
|
||||
// Options defines the available options for this package.
|
||||
type Options struct {
|
||||
Logger log.Logger
|
||||
Context context.Context
|
||||
Config *config.Config
|
||||
Metrics *metrics.Metrics
|
||||
Flags []cli.Flag
|
||||
Namespace string
|
||||
Store store.Store
|
||||
Stream events.Stream
|
||||
GatewaySelector pool.Selectable[gateway.GatewayAPIClient]
|
||||
TraceProvider trace.TracerProvider
|
||||
HistoryClient ehsvc.EventHistoryService
|
||||
RegisteredEvents []events.Unmarshaller
|
||||
}
|
||||
|
||||
// newOptions initializes the available default options.
|
||||
func newOptions(opts ...Option) Options {
|
||||
opt := Options{}
|
||||
|
||||
for _, o := range opts {
|
||||
o(&opt)
|
||||
}
|
||||
|
||||
return opt
|
||||
}
|
||||
|
||||
// Logger provides a function to set the logger option.
|
||||
func Logger(val log.Logger) Option {
|
||||
return func(o *Options) {
|
||||
o.Logger = val
|
||||
}
|
||||
}
|
||||
|
||||
// Context provides a function to set the context option.
|
||||
func Context(val context.Context) Option {
|
||||
return func(o *Options) {
|
||||
o.Context = val
|
||||
}
|
||||
}
|
||||
|
||||
// Config provides a function to set the config option.
|
||||
func Config(val *config.Config) Option {
|
||||
return func(o *Options) {
|
||||
o.Config = val
|
||||
}
|
||||
}
|
||||
|
||||
// Metrics provides a function to set the metrics option.
|
||||
func Metrics(val *metrics.Metrics) Option {
|
||||
return func(o *Options) {
|
||||
o.Metrics = val
|
||||
}
|
||||
}
|
||||
|
||||
// Flags provides a function to set the flags option.
|
||||
func Flags(val []cli.Flag) Option {
|
||||
return func(o *Options) {
|
||||
o.Flags = append(o.Flags, val...)
|
||||
}
|
||||
}
|
||||
|
||||
// Namespace provides a function to set the Namespace option.
|
||||
func Namespace(val string) Option {
|
||||
return func(o *Options) {
|
||||
o.Namespace = val
|
||||
}
|
||||
}
|
||||
|
||||
// Store provides a function to configure the store
|
||||
func Store(store store.Store) Option {
|
||||
return func(o *Options) {
|
||||
o.Store = store
|
||||
}
|
||||
}
|
||||
|
||||
// Stream provides a function to configure the stream
|
||||
func Stream(stream events.Stream) Option {
|
||||
return func(o *Options) {
|
||||
o.Stream = stream
|
||||
}
|
||||
}
|
||||
|
||||
// GatewaySelector provides a function to configure the gateway client selector
|
||||
func GatewaySelector(gatewaySelector pool.Selectable[gateway.GatewayAPIClient]) Option {
|
||||
return func(o *Options) {
|
||||
o.GatewaySelector = gatewaySelector
|
||||
}
|
||||
}
|
||||
|
||||
// History provides a function to configure the event history client
|
||||
func History(h ehsvc.EventHistoryService) Option {
|
||||
return func(o *Options) {
|
||||
o.HistoryClient = h
|
||||
}
|
||||
}
|
||||
|
||||
// RegisteredEvents provides a function to register events
|
||||
func RegisteredEvents(evs []events.Unmarshaller) Option {
|
||||
return func(o *Options) {
|
||||
o.RegisteredEvents = evs
|
||||
}
|
||||
}
|
||||
|
||||
// TraceProvider provides a function to set the TracerProvider option
|
||||
func TraceProvider(val trace.TracerProvider) Option {
|
||||
return func(o *Options) {
|
||||
o.TraceProvider = val
|
||||
}
|
||||
}
|
||||
100
services/activitylog/pkg/server/http/server.go
Normal file
100
services/activitylog/pkg/server/http/server.go
Normal file
@@ -0,0 +1,100 @@
|
||||
package http
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
stdhttp "net/http"
|
||||
|
||||
"github.com/go-chi/chi/v5"
|
||||
chimiddleware "github.com/go-chi/chi/v5/middleware"
|
||||
"github.com/owncloud/ocis/v2/ocis-pkg/account"
|
||||
"github.com/owncloud/ocis/v2/ocis-pkg/cors"
|
||||
"github.com/owncloud/ocis/v2/ocis-pkg/middleware"
|
||||
"github.com/owncloud/ocis/v2/ocis-pkg/service/http"
|
||||
"github.com/owncloud/ocis/v2/ocis-pkg/tracing"
|
||||
"github.com/owncloud/ocis/v2/ocis-pkg/version"
|
||||
svc "github.com/owncloud/ocis/v2/services/activitylog/pkg/service"
|
||||
"github.com/riandyrn/otelchi"
|
||||
"go-micro.dev/v4"
|
||||
)
|
||||
|
||||
// Service is the service interface
|
||||
type Service interface{}
|
||||
|
||||
// Server initializes the http service and server.
|
||||
func Server(opts ...Option) (http.Service, error) {
|
||||
options := newOptions(opts...)
|
||||
|
||||
service, err := http.NewService(
|
||||
http.TLSConfig(options.Config.HTTP.TLS),
|
||||
http.Logger(options.Logger),
|
||||
http.Namespace(options.Config.HTTP.Namespace),
|
||||
http.Name(options.Config.Service.Name),
|
||||
http.Version(version.GetString()),
|
||||
http.Address(options.Config.HTTP.Addr),
|
||||
http.Context(options.Context),
|
||||
http.Flags(options.Flags...),
|
||||
http.TraceProvider(options.TraceProvider),
|
||||
)
|
||||
if err != nil {
|
||||
options.Logger.Error().
|
||||
Err(err).
|
||||
Msg("Error initializing http service")
|
||||
return http.Service{}, fmt.Errorf("could not initialize http service: %w", err)
|
||||
}
|
||||
|
||||
middlewares := []func(stdhttp.Handler) stdhttp.Handler{
|
||||
chimiddleware.RequestID,
|
||||
middleware.Version(
|
||||
options.Config.Service.Name,
|
||||
version.GetString(),
|
||||
),
|
||||
middleware.Logger(
|
||||
options.Logger,
|
||||
),
|
||||
middleware.ExtractAccountUUID(
|
||||
account.Logger(options.Logger),
|
||||
account.JWTSecret(options.Config.TokenManager.JWTSecret),
|
||||
),
|
||||
middleware.Cors(
|
||||
cors.Logger(options.Logger),
|
||||
cors.AllowedOrigins(options.Config.HTTP.CORS.AllowedOrigins),
|
||||
cors.AllowedMethods(options.Config.HTTP.CORS.AllowedMethods),
|
||||
cors.AllowedHeaders(options.Config.HTTP.CORS.AllowedHeaders),
|
||||
cors.AllowCredentials(options.Config.HTTP.CORS.AllowCredentials),
|
||||
),
|
||||
}
|
||||
|
||||
mux := chi.NewMux()
|
||||
mux.Use(middlewares...)
|
||||
|
||||
mux.Use(
|
||||
otelchi.Middleware(
|
||||
"actitivylog",
|
||||
otelchi.WithChiRoutes(mux),
|
||||
otelchi.WithTracerProvider(options.TraceProvider),
|
||||
otelchi.WithPropagators(tracing.GetPropagator()),
|
||||
),
|
||||
)
|
||||
|
||||
handle, err := svc.New(
|
||||
svc.Logger(options.Logger),
|
||||
svc.Stream(options.Stream),
|
||||
svc.Mux(mux),
|
||||
svc.Store(options.Store),
|
||||
svc.Config(options.Config),
|
||||
svc.GatewaySelector(options.GatewaySelector),
|
||||
svc.TraceProvider(options.TraceProvider),
|
||||
svc.HistoryClient(options.HistoryClient),
|
||||
svc.RegisteredEvents(options.RegisteredEvents),
|
||||
)
|
||||
if err != nil {
|
||||
return http.Service{}, err
|
||||
}
|
||||
|
||||
if err := micro.RegisterHandler(service.Server(), handle); err != nil {
|
||||
return http.Service{}, err
|
||||
}
|
||||
|
||||
return service, nil
|
||||
}
|
||||
114
services/activitylog/pkg/service/http.go
Normal file
114
services/activitylog/pkg/service/http.go
Normal file
@@ -0,0 +1,114 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/url"
|
||||
|
||||
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
|
||||
"github.com/cs3org/reva/v2/pkg/events"
|
||||
"github.com/cs3org/reva/v2/pkg/storagespace"
|
||||
"github.com/go-chi/chi/v5"
|
||||
ehmsg "github.com/owncloud/ocis/v2/protogen/gen/ocis/messages/eventhistory/v0"
|
||||
ehsvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/eventhistory/v0"
|
||||
"github.com/owncloud/ocis/v2/services/graph/pkg/errorcode"
|
||||
)
|
||||
|
||||
// ServeHTTP implements the http.Handler interface.
|
||||
func (s *ActivitylogService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
s.mux.ServeHTTP(w, r)
|
||||
}
|
||||
|
||||
// HandleGetItemActivities handles the request to get the activities of an item.
|
||||
func (s *ActivitylogService) HandleGetItemActivities(w http.ResponseWriter, r *http.Request) {
|
||||
// TODO: Compare driveid with itemid to avoid bad requests
|
||||
rid, err := parseIDParam(r, "item-id")
|
||||
if err != nil {
|
||||
s.log.Info().Err(err).Msg("invalid resource id")
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
raw, err := s.Activities(&rid)
|
||||
if err != nil {
|
||||
s.log.Error().Err(err).Msg("error getting activities")
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
ids := make([]string, 0, len(raw))
|
||||
for _, a := range raw {
|
||||
// TODO: Filter by depth and timestamp
|
||||
ids = append(ids, a.EventID)
|
||||
}
|
||||
|
||||
fmt.Println("IDS:", ids)
|
||||
|
||||
evRes, err := s.evHistory.GetEvents(r.Context(), &ehsvc.GetEventsRequest{Ids: ids})
|
||||
if err != nil {
|
||||
s.log.Error().Err(err).Msg("error getting events")
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
// TODO: compare returned events with initial list and remove missing ones
|
||||
|
||||
fmt.Println("EVENTS:", evRes.GetEvents())
|
||||
|
||||
var acts []Activity
|
||||
for _, e := range evRes.GetEvents() {
|
||||
// FIXME: Should all users get all events? If not we can filter here
|
||||
|
||||
switch ev := s.unwrapEvent(e).(type) {
|
||||
case nil:
|
||||
// error already logged in unwrapEvent
|
||||
continue
|
||||
case events.UploadReady:
|
||||
act := UploadReady(e.Id, ev)
|
||||
acts = append(acts, act)
|
||||
}
|
||||
}
|
||||
|
||||
fmt.Println("ACTIVITIES:", acts)
|
||||
|
||||
b, err := json.Marshal(acts)
|
||||
if err != nil {
|
||||
s.log.Error().Err(err).Msg("error marshalling activities")
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
w.Write(b)
|
||||
w.WriteHeader(http.StatusOK)
|
||||
}
|
||||
|
||||
func (s *ActivitylogService) unwrapEvent(e *ehmsg.Event) interface{} {
|
||||
etype, ok := s.registeredEvents[e.GetType()]
|
||||
if !ok {
|
||||
s.log.Error().Str("eventid", e.GetId()).Str("eventtype", e.GetType()).Msg("event not registered")
|
||||
return nil
|
||||
}
|
||||
|
||||
einterface, err := etype.Unmarshal(e.GetEvent())
|
||||
if err != nil {
|
||||
s.log.Error().Str("eventid", e.GetId()).Str("eventtype", e.GetType()).Msg("failed to umarshal event")
|
||||
return nil
|
||||
}
|
||||
|
||||
return einterface
|
||||
}
|
||||
|
||||
// TODO: I found this on graph service. We should move it to `utils` pkg so both services can use it.
|
||||
func parseIDParam(r *http.Request, param string) (provider.ResourceId, error) {
|
||||
driveID, err := url.PathUnescape(chi.URLParam(r, param))
|
||||
if err != nil {
|
||||
return provider.ResourceId{}, errorcode.New(errorcode.InvalidRequest, err.Error())
|
||||
}
|
||||
|
||||
id, err := storagespace.ParseID(driveID)
|
||||
if err != nil {
|
||||
return provider.ResourceId{}, errorcode.New(errorcode.InvalidRequest, err.Error())
|
||||
}
|
||||
return id, nil
|
||||
}
|
||||
@@ -4,7 +4,9 @@ import (
|
||||
gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1"
|
||||
"github.com/cs3org/reva/v2/pkg/events"
|
||||
"github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool"
|
||||
"github.com/go-chi/chi/v5"
|
||||
"github.com/owncloud/ocis/v2/ocis-pkg/log"
|
||||
ehsvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/eventhistory/v0"
|
||||
"github.com/owncloud/ocis/v2/services/activitylog/pkg/config"
|
||||
microstore "go-micro.dev/v4/store"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
@@ -22,6 +24,8 @@ type Options struct {
|
||||
RegisteredEvents []events.Unmarshaller
|
||||
Store microstore.Store
|
||||
GatewaySelector pool.Selectable[gateway.GatewayAPIClient]
|
||||
Mux *chi.Mux
|
||||
HistoryClient ehsvc.EventHistoryService
|
||||
}
|
||||
|
||||
// Logger configures a logger for the activitylog service
|
||||
@@ -72,3 +76,17 @@ func GatewaySelector(gatewaySelector pool.Selectable[gateway.GatewayAPIClient])
|
||||
o.GatewaySelector = gatewaySelector
|
||||
}
|
||||
}
|
||||
|
||||
// Mux defines the muxer for the service
|
||||
func Mux(m *chi.Mux) Option {
|
||||
return func(o *Options) {
|
||||
o.Mux = m
|
||||
}
|
||||
}
|
||||
|
||||
// HistoryClient adds a grpc client for the eventhistory service
|
||||
func HistoryClient(hc ehsvc.EventHistoryService) Option {
|
||||
return func(o *Options) {
|
||||
o.HistoryClient = hc
|
||||
}
|
||||
}
|
||||
|
||||
67
services/activitylog/pkg/service/response.go
Normal file
67
services/activitylog/pkg/service/response.go
Normal file
@@ -0,0 +1,67 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/cs3org/reva/v2/pkg/events"
|
||||
"github.com/cs3org/reva/v2/pkg/storagespace"
|
||||
)
|
||||
|
||||
// GetActivitiesResponse is the response on GET activities requests
|
||||
type GetActivitiesResponse struct {
|
||||
Activities []Activity `json:"value"`
|
||||
}
|
||||
|
||||
// Activity represents an activity as it is returned to the client
|
||||
type Activity struct {
|
||||
ID string `json:"id"`
|
||||
|
||||
// TODO: Implement these
|
||||
Action interface{} `json:"action"`
|
||||
DriveItem Resource `json:"driveItem"`
|
||||
Actor Actor `json:"actor"`
|
||||
Times Times `json:"times"`
|
||||
|
||||
Template Template `json:"template"`
|
||||
}
|
||||
|
||||
// Resource represents an item such as a file or folder
|
||||
type Resource struct {
|
||||
ID string `json:"id"`
|
||||
Name string `json:"name"`
|
||||
}
|
||||
|
||||
// Actor represents the user who performed the Action
|
||||
type Actor struct {
|
||||
ID string `json:"id"`
|
||||
DisplayName string `json:"displayName"`
|
||||
}
|
||||
|
||||
// Times represents the timestamps of the Activity
|
||||
type Times struct {
|
||||
RecordedTime time.Time `json:"recordedTime"`
|
||||
}
|
||||
|
||||
// Template contains activity details
|
||||
type Template struct {
|
||||
Message string `json:"message"`
|
||||
Variables map[string]interface{} `json:"variables"`
|
||||
}
|
||||
|
||||
// UploadReady converts a UploadReady events to an Activity
|
||||
func UploadReady(eid string, e events.UploadReady) Activity {
|
||||
rid, _ := storagespace.FormatReference(e.FileRef)
|
||||
res := Resource{
|
||||
ID: rid,
|
||||
Name: e.Filename,
|
||||
}
|
||||
return Activity{
|
||||
ID: eid,
|
||||
Template: Template{
|
||||
Message: "file created",
|
||||
Variables: map[string]interface{}{
|
||||
"resource": res,
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"path/filepath"
|
||||
"reflect"
|
||||
"time"
|
||||
|
||||
gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1"
|
||||
@@ -13,13 +14,15 @@ import (
|
||||
"github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool"
|
||||
"github.com/cs3org/reva/v2/pkg/storagespace"
|
||||
"github.com/cs3org/reva/v2/pkg/utils"
|
||||
"github.com/go-chi/chi/v5"
|
||||
"github.com/owncloud/ocis/v2/ocis-pkg/log"
|
||||
ehsvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/eventhistory/v0"
|
||||
"github.com/owncloud/ocis/v2/services/activitylog/pkg/config"
|
||||
microstore "go-micro.dev/v4/store"
|
||||
)
|
||||
|
||||
// Activity represents an activity
|
||||
type Activity struct {
|
||||
// RawActivity represents an activity as it is stored in the activitylog store
|
||||
type RawActivity struct {
|
||||
EventID string `json:"event_id"`
|
||||
Depth int `json:"depth"`
|
||||
Timestamp time.Time `json:"timestamp"`
|
||||
@@ -27,11 +30,15 @@ type Activity struct {
|
||||
|
||||
// ActivitylogService logs events per resource
|
||||
type ActivitylogService struct {
|
||||
cfg *config.Config
|
||||
log log.Logger
|
||||
events <-chan events.Event
|
||||
store microstore.Store
|
||||
gws pool.Selectable[gateway.GatewayAPIClient]
|
||||
cfg *config.Config
|
||||
log log.Logger
|
||||
events <-chan events.Event
|
||||
store microstore.Store
|
||||
gws pool.Selectable[gateway.GatewayAPIClient]
|
||||
mux *chi.Mux
|
||||
evHistory ehsvc.EventHistoryService
|
||||
|
||||
registeredEvents map[string]events.Unmarshaller
|
||||
}
|
||||
|
||||
// New creates a new ActivitylogService
|
||||
@@ -55,13 +62,27 @@ func New(opts ...Option) (*ActivitylogService, error) {
|
||||
}
|
||||
|
||||
s := &ActivitylogService{
|
||||
log: o.Logger,
|
||||
cfg: o.Config,
|
||||
events: ch,
|
||||
store: o.Store,
|
||||
gws: o.GatewaySelector,
|
||||
log: o.Logger,
|
||||
cfg: o.Config,
|
||||
events: ch,
|
||||
store: o.Store,
|
||||
gws: o.GatewaySelector,
|
||||
mux: o.Mux,
|
||||
evHistory: o.HistoryClient,
|
||||
registeredEvents: make(map[string]events.Unmarshaller),
|
||||
}
|
||||
|
||||
s.mux.Route("/graph/v1.0/drives/{drive-id}", func(r chi.Router) {
|
||||
r.Get("/items/{item-id}/activities", s.HandleGetItemActivities)
|
||||
})
|
||||
|
||||
for _, e := range o.RegisteredEvents {
|
||||
typ := reflect.TypeOf(e)
|
||||
s.registeredEvents[typ.String()] = e
|
||||
}
|
||||
|
||||
go s.Run()
|
||||
|
||||
return s, nil
|
||||
}
|
||||
|
||||
@@ -155,7 +176,7 @@ func (a *ActivitylogService) AddActivityTrashed(resourceID *provider.ResourceId,
|
||||
}
|
||||
|
||||
// Activities returns the activities for the given resource
|
||||
func (a *ActivitylogService) Activities(rid *provider.ResourceId) ([]Activity, error) {
|
||||
func (a *ActivitylogService) Activities(rid *provider.ResourceId) ([]RawActivity, error) {
|
||||
resourceID := storagespace.FormatResourceID(*rid)
|
||||
|
||||
records, err := a.store.Read(resourceID)
|
||||
@@ -164,10 +185,10 @@ func (a *ActivitylogService) Activities(rid *provider.ResourceId) ([]Activity, e
|
||||
}
|
||||
|
||||
if len(records) == 0 {
|
||||
return []Activity{}, nil
|
||||
return []RawActivity{}, nil
|
||||
}
|
||||
|
||||
var activities []Activity
|
||||
var activities []RawActivity
|
||||
if err := json.Unmarshal(records[0].Value, &activities); err != nil {
|
||||
return nil, fmt.Errorf("could not unmarshal activities: %w", err)
|
||||
}
|
||||
@@ -214,7 +235,7 @@ func (a *ActivitylogService) storeActivity(rid *provider.ResourceId, eventID str
|
||||
return err
|
||||
}
|
||||
|
||||
var activities []Activity
|
||||
var activities []RawActivity
|
||||
if len(records) > 0 {
|
||||
if err := json.Unmarshal(records[0].Value, &activities); err != nil {
|
||||
return err
|
||||
@@ -222,7 +243,7 @@ func (a *ActivitylogService) storeActivity(rid *provider.ResourceId, eventID str
|
||||
}
|
||||
|
||||
// TODO: max len check?
|
||||
activities = append(activities, Activity{
|
||||
activities = append(activities, RawActivity{
|
||||
EventID: eventID,
|
||||
Depth: depth,
|
||||
Timestamp: timestamp,
|
||||
|
||||
@@ -33,9 +33,10 @@ func DefaultConfig() *config.Config {
|
||||
EnableTLS: false,
|
||||
},
|
||||
Store: config.Store{
|
||||
Store: "memory",
|
||||
Store: "nats-js-kv",
|
||||
Nodes: []string{"127.0.0.1:9233"},
|
||||
Database: "eventhistory",
|
||||
Table: "events",
|
||||
Table: "",
|
||||
TTL: 336 * time.Hour,
|
||||
},
|
||||
GRPC: config.GRPCConfig{
|
||||
|
||||
@@ -236,6 +236,17 @@ func DefaultPolicies() []config.Policy {
|
||||
Endpoint: "/app/", // /app or /apps? ocdav only handles /apps
|
||||
Service: "com.owncloud.web.frontend",
|
||||
},
|
||||
// reroute activities endpoint to activitylog service
|
||||
// {
|
||||
// Type: config.RegexRoute,
|
||||
// Endpoint: "/graph/v1.0/drives/[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}/items/[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}/activities",
|
||||
// Service: "com.owncloud.web.activitylog",
|
||||
// },
|
||||
{
|
||||
Type: config.RegexRoute,
|
||||
Endpoint: "/graph/v1.0/drives/[^/]+/items/[^/]+/activities",
|
||||
Service: "com.owncloud.web.activitylog",
|
||||
},
|
||||
{
|
||||
Endpoint: "/graph/v1.0/invitations",
|
||||
Service: "com.owncloud.web.invitations",
|
||||
|
||||
Reference in New Issue
Block a user