diff --git a/services/graph/pkg/command/server.go b/services/graph/pkg/command/server.go index 7a10c42cfb..2d43483dcf 100644 --- a/services/graph/pkg/command/server.go +++ b/services/graph/pkg/command/server.go @@ -15,6 +15,10 @@ import ( "github.com/opencloud-eu/opencloud/services/graph/pkg/metrics" "github.com/opencloud-eu/opencloud/services/graph/pkg/server/debug" "github.com/opencloud-eu/opencloud/services/graph/pkg/server/http" + microstore "go-micro.dev/v4/store" + + "github.com/opencloud-eu/reva/v2/pkg/store" + "github.com/urfave/cli/v2" ) @@ -46,9 +50,18 @@ func Server(cfg *config.Config) *cli.Command { gr := runner.NewGroup() { + persistence := store.Create( + store.Store(cfg.Store.Store), + microstore.Nodes(cfg.Store.Nodes...), + microstore.Database(cfg.Store.Database), + microstore.Table(cfg.Store.Table), + store.Authentication(cfg.Store.AuthUsername, cfg.Store.AuthPassword), + ) + server, err := http.Server( http.Logger(logger), http.Context(ctx), + http.Store(persistence), http.Config(cfg), http.Metrics(mtrcs), http.TraceProvider(traceProvider), diff --git a/services/graph/pkg/server/http/option.go b/services/graph/pkg/server/http/option.go index e4151518aa..e384b87e61 100644 --- a/services/graph/pkg/server/http/option.go +++ b/services/graph/pkg/server/http/option.go @@ -7,6 +7,7 @@ import ( "github.com/opencloud-eu/opencloud/services/graph/pkg/config" "github.com/opencloud-eu/opencloud/services/graph/pkg/metrics" "github.com/urfave/cli/v2" + microstore "go-micro.dev/v4/store" "go.opentelemetry.io/otel/trace" ) @@ -22,6 +23,7 @@ type Options struct { Flags []cli.Flag Namespace string TraceProvider trace.TracerProvider + Store microstore.Store } // newOptions initializes the available default options. @@ -77,6 +79,13 @@ func Namespace(val string) Option { } } +// Store configures the store to use +func Store(store microstore.Store) Option { + return func(o *Options) { + o.Store = store + } +} + // TraceProvider provides a function to set the TraceProvider option. func TraceProvider(val trace.TracerProvider) Option { return func(o *Options) { diff --git a/services/graph/pkg/server/http/server.go b/services/graph/pkg/server/http/server.go index cfa43c231f..d5ccf2b503 100644 --- a/services/graph/pkg/server/http/server.go +++ b/services/graph/pkg/server/http/server.go @@ -178,6 +178,7 @@ func Server(opts ...Option) (http.Service, error) { svc.KeycloakClient(keyCloakClient), svc.EventHistoryClient(hClient), svc.TraceProvider(options.TraceProvider), + svc.Store(options.Store), ) if err != nil { diff --git a/services/graph/pkg/service/v0/graph.go b/services/graph/pkg/service/v0/graph.go index dbcbef72bf..ced6c85f12 100644 --- a/services/graph/pkg/service/v0/graph.go +++ b/services/graph/pkg/service/v0/graph.go @@ -12,6 +12,7 @@ import ( storageprovider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" "github.com/go-chi/chi/v5" "github.com/jellydator/ttlcache/v3" + "github.com/nats-io/nats.go" "go-micro.dev/v4/client" "go.opentelemetry.io/otel/trace" "google.golang.org/protobuf/types/known/emptypb" @@ -67,6 +68,7 @@ type Graph struct { keycloakClient keycloak.Client historyClient ehsvc.EventHistoryService traceProvider trace.TracerProvider + natskv nats.KeyValue } // ServeHTTP implements the Service interface. diff --git a/services/graph/pkg/service/v0/option.go b/services/graph/pkg/service/v0/option.go index b4ffa0ed20..516e7b55cf 100644 --- a/services/graph/pkg/service/v0/option.go +++ b/services/graph/pkg/service/v0/option.go @@ -7,6 +7,7 @@ import ( gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1" "github.com/opencloud-eu/reva/v2/pkg/events" "github.com/opencloud-eu/reva/v2/pkg/rgrpc/todo/pool" + microstore "go-micro.dev/v4/store" "go.opentelemetry.io/otel/trace" "github.com/opencloud-eu/opencloud/pkg/keycloak" @@ -42,6 +43,7 @@ type Options struct { SearchService searchsvc.SearchProviderService KeycloakClient keycloak.Client EventHistoryClient ehsvc.EventHistoryService + Store microstore.Store TraceProvider trace.TracerProvider } @@ -175,6 +177,13 @@ func EventHistoryClient(val ehsvc.EventHistoryService) Option { } } +// Store configures the store to use +func Store(store microstore.Store) Option { + return func(o *Options) { + o.Store = store + } +} + // TraceProvider provides a function to set the TraceProvider option. func TraceProvider(val trace.TracerProvider) Option { return func(o *Options) { diff --git a/services/graph/pkg/service/v0/service.go b/services/graph/pkg/service/v0/service.go index 7377960c74..34f718d79b 100644 --- a/services/graph/pkg/service/v0/service.go +++ b/services/graph/pkg/service/v0/service.go @@ -15,6 +15,7 @@ import ( "github.com/go-chi/chi/v5/middleware" ldapv3 "github.com/go-ldap/ldap/v3" "github.com/jellydator/ttlcache/v3" + "github.com/nats-io/nats.go" "github.com/riandyrn/otelchi" microstore "go-micro.dev/v4/store" @@ -23,6 +24,7 @@ import ( "github.com/opencloud-eu/reva/v2/pkg/store" "github.com/opencloud-eu/reva/v2/pkg/utils" "github.com/opencloud-eu/reva/v2/pkg/utils/ldap" + "github.com/pkg/errors" ocldap "github.com/opencloud-eu/opencloud/pkg/ldap" "github.com/opencloud-eu/opencloud/pkg/log" @@ -153,6 +155,37 @@ func NewService(opts ...Option) (Graph, error) { //nolint:maintidx identity.IdentityCacheWithGroupsTTL(time.Duration(options.Config.Spaces.GroupsCacheTTL)), ) + // Connect to NATS servers + natsOptions := nats.Options{ + Servers: options.Config.Store.Nodes, + } + conn, err := natsOptions.Connect() + if err != nil { + return Graph{}, err + } + + js, err := conn.JetStream() + if err != nil { + return Graph{}, err + } + + kv, err := js.KeyValue(options.Config.Store.Database) + if err != nil { + if !errors.Is(err, nats.ErrBucketNotFound) { + return Graph{}, errors.Wrapf(err, "Failed to get bucket (%s)", options.Config.Store.Database) + } + + kv, err = js.CreateKeyValue(&nats.KeyValueConfig{ + Bucket: options.Config.Store.Database, + }) + if err != nil { + return Graph{}, errors.Wrapf(err, "Failed to create bucket (%s)", options.Config.Store.Database) + } + } + if err != nil { + return Graph{}, err + } + baseGraphService := BaseGraphService{ logger: &options.Logger, identityCache: identityCache, @@ -198,6 +231,7 @@ func NewService(opts ...Option) (Graph, error) { //nolint:maintidx historyClient: options.EventHistoryClient, traceProvider: options.TraceProvider, valueService: options.ValueService, + natskv: kv, } if err := setIdentityBackends(options, &svc); err != nil {