mirror of
https://github.com/opencloud-eu/opencloud.git
synced 2026-01-05 11:51:16 -06:00
add nats-js-kv connection to graph
Signed-off-by: Christian Richter <c.richter@opencloud.eu> # Conflicts: # services/graph/pkg/service/v0/service.go
This commit is contained in:
committed by
Ralf Haferkamp
parent
63f976cac1
commit
07a9308c4c
@@ -15,6 +15,10 @@ import (
|
||||
"github.com/opencloud-eu/opencloud/services/graph/pkg/metrics"
|
||||
"github.com/opencloud-eu/opencloud/services/graph/pkg/server/debug"
|
||||
"github.com/opencloud-eu/opencloud/services/graph/pkg/server/http"
|
||||
microstore "go-micro.dev/v4/store"
|
||||
|
||||
"github.com/opencloud-eu/reva/v2/pkg/store"
|
||||
|
||||
"github.com/urfave/cli/v2"
|
||||
)
|
||||
|
||||
@@ -46,9 +50,18 @@ func Server(cfg *config.Config) *cli.Command {
|
||||
|
||||
gr := runner.NewGroup()
|
||||
{
|
||||
persistence := store.Create(
|
||||
store.Store(cfg.Store.Store),
|
||||
microstore.Nodes(cfg.Store.Nodes...),
|
||||
microstore.Database(cfg.Store.Database),
|
||||
microstore.Table(cfg.Store.Table),
|
||||
store.Authentication(cfg.Store.AuthUsername, cfg.Store.AuthPassword),
|
||||
)
|
||||
|
||||
server, err := http.Server(
|
||||
http.Logger(logger),
|
||||
http.Context(ctx),
|
||||
http.Store(persistence),
|
||||
http.Config(cfg),
|
||||
http.Metrics(mtrcs),
|
||||
http.TraceProvider(traceProvider),
|
||||
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
"github.com/opencloud-eu/opencloud/services/graph/pkg/config"
|
||||
"github.com/opencloud-eu/opencloud/services/graph/pkg/metrics"
|
||||
"github.com/urfave/cli/v2"
|
||||
microstore "go-micro.dev/v4/store"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
)
|
||||
|
||||
@@ -22,6 +23,7 @@ type Options struct {
|
||||
Flags []cli.Flag
|
||||
Namespace string
|
||||
TraceProvider trace.TracerProvider
|
||||
Store microstore.Store
|
||||
}
|
||||
|
||||
// newOptions initializes the available default options.
|
||||
@@ -77,6 +79,13 @@ func Namespace(val string) Option {
|
||||
}
|
||||
}
|
||||
|
||||
// Store configures the store to use
|
||||
func Store(store microstore.Store) Option {
|
||||
return func(o *Options) {
|
||||
o.Store = store
|
||||
}
|
||||
}
|
||||
|
||||
// TraceProvider provides a function to set the TraceProvider option.
|
||||
func TraceProvider(val trace.TracerProvider) Option {
|
||||
return func(o *Options) {
|
||||
|
||||
@@ -178,6 +178,7 @@ func Server(opts ...Option) (http.Service, error) {
|
||||
svc.KeycloakClient(keyCloakClient),
|
||||
svc.EventHistoryClient(hClient),
|
||||
svc.TraceProvider(options.TraceProvider),
|
||||
svc.Store(options.Store),
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
|
||||
@@ -12,6 +12,7 @@ import (
|
||||
storageprovider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
|
||||
"github.com/go-chi/chi/v5"
|
||||
"github.com/jellydator/ttlcache/v3"
|
||||
"github.com/nats-io/nats.go"
|
||||
"go-micro.dev/v4/client"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
"google.golang.org/protobuf/types/known/emptypb"
|
||||
@@ -67,6 +68,7 @@ type Graph struct {
|
||||
keycloakClient keycloak.Client
|
||||
historyClient ehsvc.EventHistoryService
|
||||
traceProvider trace.TracerProvider
|
||||
natskv nats.KeyValue
|
||||
}
|
||||
|
||||
// ServeHTTP implements the Service interface.
|
||||
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1"
|
||||
"github.com/opencloud-eu/reva/v2/pkg/events"
|
||||
"github.com/opencloud-eu/reva/v2/pkg/rgrpc/todo/pool"
|
||||
microstore "go-micro.dev/v4/store"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
|
||||
"github.com/opencloud-eu/opencloud/pkg/keycloak"
|
||||
@@ -42,6 +43,7 @@ type Options struct {
|
||||
SearchService searchsvc.SearchProviderService
|
||||
KeycloakClient keycloak.Client
|
||||
EventHistoryClient ehsvc.EventHistoryService
|
||||
Store microstore.Store
|
||||
TraceProvider trace.TracerProvider
|
||||
}
|
||||
|
||||
@@ -175,6 +177,13 @@ func EventHistoryClient(val ehsvc.EventHistoryService) Option {
|
||||
}
|
||||
}
|
||||
|
||||
// Store configures the store to use
|
||||
func Store(store microstore.Store) Option {
|
||||
return func(o *Options) {
|
||||
o.Store = store
|
||||
}
|
||||
}
|
||||
|
||||
// TraceProvider provides a function to set the TraceProvider option.
|
||||
func TraceProvider(val trace.TracerProvider) Option {
|
||||
return func(o *Options) {
|
||||
|
||||
@@ -15,6 +15,7 @@ import (
|
||||
"github.com/go-chi/chi/v5/middleware"
|
||||
ldapv3 "github.com/go-ldap/ldap/v3"
|
||||
"github.com/jellydator/ttlcache/v3"
|
||||
"github.com/nats-io/nats.go"
|
||||
"github.com/riandyrn/otelchi"
|
||||
microstore "go-micro.dev/v4/store"
|
||||
|
||||
@@ -23,6 +24,7 @@ import (
|
||||
"github.com/opencloud-eu/reva/v2/pkg/store"
|
||||
"github.com/opencloud-eu/reva/v2/pkg/utils"
|
||||
"github.com/opencloud-eu/reva/v2/pkg/utils/ldap"
|
||||
"github.com/pkg/errors"
|
||||
|
||||
ocldap "github.com/opencloud-eu/opencloud/pkg/ldap"
|
||||
"github.com/opencloud-eu/opencloud/pkg/log"
|
||||
@@ -153,6 +155,37 @@ func NewService(opts ...Option) (Graph, error) { //nolint:maintidx
|
||||
identity.IdentityCacheWithGroupsTTL(time.Duration(options.Config.Spaces.GroupsCacheTTL)),
|
||||
)
|
||||
|
||||
// Connect to NATS servers
|
||||
natsOptions := nats.Options{
|
||||
Servers: options.Config.Store.Nodes,
|
||||
}
|
||||
conn, err := natsOptions.Connect()
|
||||
if err != nil {
|
||||
return Graph{}, err
|
||||
}
|
||||
|
||||
js, err := conn.JetStream()
|
||||
if err != nil {
|
||||
return Graph{}, err
|
||||
}
|
||||
|
||||
kv, err := js.KeyValue(options.Config.Store.Database)
|
||||
if err != nil {
|
||||
if !errors.Is(err, nats.ErrBucketNotFound) {
|
||||
return Graph{}, errors.Wrapf(err, "Failed to get bucket (%s)", options.Config.Store.Database)
|
||||
}
|
||||
|
||||
kv, err = js.CreateKeyValue(&nats.KeyValueConfig{
|
||||
Bucket: options.Config.Store.Database,
|
||||
})
|
||||
if err != nil {
|
||||
return Graph{}, errors.Wrapf(err, "Failed to create bucket (%s)", options.Config.Store.Database)
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
return Graph{}, err
|
||||
}
|
||||
|
||||
baseGraphService := BaseGraphService{
|
||||
logger: &options.Logger,
|
||||
identityCache: identityCache,
|
||||
@@ -198,6 +231,7 @@ func NewService(opts ...Option) (Graph, error) { //nolint:maintidx
|
||||
historyClient: options.EventHistoryClient,
|
||||
traceProvider: options.TraceProvider,
|
||||
valueService: options.ValueService,
|
||||
natskv: kv,
|
||||
}
|
||||
|
||||
if err := setIdentityBackends(options, &svc); err != nil {
|
||||
|
||||
Reference in New Issue
Block a user