Merge branch 'master' into config-doc-descriptions

This commit is contained in:
Willy Kloucek
2022-06-28 13:03:19 +02:00
1251 changed files with 4036 additions and 3957 deletions

View File

@@ -0,0 +1,29 @@
issues:
exclude-rules:
- path: pkg/proto/v0
text: "SA1019:"
linters:
- staticcheck
linters:
enable:
- bodyclose
- deadcode
- errcheck
- gosimple
- govet
- ineffassign
- staticcheck
- structcheck
- typecheck
- unused
- varcheck
- depguard
- revive
- goimports
- unconvert
- scopelint
- maligned
- misspell
- gocritic
- prealloc
#- gosec

45
services/store/Makefile Normal file
View File

@@ -0,0 +1,45 @@
SHELL := bash
NAME := store
include ../../.make/recursion.mk
############ tooling ############
ifneq (, $(shell command -v go 2> /dev/null)) # suppress `command not found warnings` for non go targets in CI
include ../../.bingo/Variables.mk
endif
############ go tooling ############
include ../../.make/go.mk
############ release ############
include ../../.make/release.mk
############ docs generate ############
include ../../.make/docs.mk
.PHONY: docs-generate
docs-generate: config-docs-generate \
grpc-docs-generate
############ generate ############
include ../../.make/generate.mk
.PHONY: ci-go-generate
ci-go-generate: protobuf # CI runs ci-node-generate automatically before this target
.PHONY: ci-node-generate
ci-node-generate:
############ protobuf ############
include ../../.make/protobuf.mk
.PHONY: protobuf
protobuf: buf-generate
############ licenses ############
.PHONY: ci-node-check-licenses
ci-node-check-licenses:
.PHONY: ci-node-save-licenses
ci-node-save-licenses:

View File

@@ -0,0 +1,14 @@
package main
import (
"os"
"github.com/owncloud/ocis/v2/services/store/pkg/command"
"github.com/owncloud/ocis/v2/services/store/pkg/config/defaults"
)
func main() {
if err := command.Execute(defaults.DefaultConfig()); err != nil {
os.Exit(1)
}
}

View File

@@ -0,0 +1,19 @@
FROM amd64/alpine:latest
RUN apk update && \
apk upgrade && \
apk add ca-certificates mailcap && \
rm -rf /var/cache/apk/* && \
echo 'hosts: files dns' >| /etc/nsswitch.conf
LABEL maintainer="ownCloud GmbH <devops@owncloud.com>" \
org.label-schema.name="oCIS Store" \
org.label-schema.vendor="ownCloud GmbH" \
org.label-schema.schema-version="1.0"
EXPOSE 9460
ENTRYPOINT ["/usr/bin/ocis-store"]
CMD ["server"]
COPY bin/ocis-store /usr/bin/ocis-store

View File

@@ -0,0 +1,19 @@
FROM arm32v6/alpine:latest
RUN apk update && \
apk upgrade && \
apk add ca-certificates mailcap && \
rm -rf /var/cache/apk/* && \
echo 'hosts: files dns' >| /etc/nsswitch.conf
LABEL maintainer="ownCloud GmbH <devops@owncloud.com>" \
org.label-schema.name="oCIS Store" \
org.label-schema.vendor="ownCloud GmbH" \
org.label-schema.schema-version="1.0"
EXPOSE 9460
ENTRYPOINT ["/usr/bin/ocis-store"]
CMD ["server"]
COPY bin/ocis-store /usr/bin/ocis-store

View File

@@ -0,0 +1,19 @@
FROM arm64v8/alpine:latest
RUN apk update && \
apk upgrade && \
apk add ca-certificates mailcap && \
rm -rf /var/cache/apk/* && \
echo 'hosts: files dns' >| /etc/nsswitch.conf
LABEL maintainer="ownCloud GmbH <devops@owncloud.com>" \
org.label-schema.name="oCIS Store" \
org.label-schema.vendor="ownCloud GmbH" \
org.label-schema.schema-version="1.0"
EXPOSE 9460
ENTRYPOINT ["/usr/bin/ocis-store"]
CMD ["server"]
COPY bin/ocis-store /usr/bin/ocis-store

View File

@@ -0,0 +1,22 @@
image: owncloud/ocis-store:{{#if build.tag}}{{trimPrefix "v" build.tag}}{{else}}latest{{/if}}
{{#if build.tags}}
tags:
{{#each build.tags}}
- {{this}}
{{/each}}
{{/if}}
manifests:
- image: owncloud/ocis-store:{{#if build.tag}}{{trimPrefix "v" build.tag}}-{{/if}}linux-amd64
platform:
architecture: amd64
os: linux
- image: owncloud/ocis-store:{{#if build.tag}}{{trimPrefix "v" build.tag}}-{{/if}}linux-arm64
platform:
architecture: arm64
variant: v8
os: linux
- image: owncloud/ocis-store:{{#if build.tag}}{{trimPrefix "v" build.tag}}-{{/if}}linux-arm
platform:
architecture: arm
variant: v6
os: linux

View File

@@ -0,0 +1,57 @@
package command
import (
"fmt"
"net/http"
"github.com/owncloud/ocis/v2/services/store/pkg/config"
"github.com/owncloud/ocis/v2/services/store/pkg/config/parser"
"github.com/owncloud/ocis/v2/services/store/pkg/logging"
"github.com/urfave/cli/v2"
)
// Health is the entrypoint for the health command.
func Health(cfg *config.Config) *cli.Command {
return &cli.Command{
Name: "health",
Usage: "check health status",
Category: "info",
Before: func(c *cli.Context) error {
err := parser.ParseConfig(cfg)
if err != nil {
fmt.Printf("%v", err)
}
return err
},
Action: func(c *cli.Context) error {
logger := logging.Configure(cfg.Service.Name, cfg.Log)
resp, err := http.Get(
fmt.Sprintf(
"http://%s/healthz",
cfg.Debug.Addr,
),
)
if err != nil {
logger.Fatal().
Err(err).
Msg("Failed to request health check")
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
logger.Fatal().
Int("code", resp.StatusCode).
Msg("Health seems to be in bad state")
}
logger.Debug().
Int("code", resp.StatusCode).
Msg("Health got a good state")
return nil
},
}
}

View File

@@ -0,0 +1,64 @@
package command
import (
"context"
"os"
"github.com/owncloud/ocis/v2/ocis-pkg/clihelper"
ociscfg "github.com/owncloud/ocis/v2/ocis-pkg/config"
"github.com/owncloud/ocis/v2/services/store/pkg/config"
"github.com/thejerf/suture/v4"
"github.com/urfave/cli/v2"
)
// GetCommands provides all commands for this service
func GetCommands(cfg *config.Config) cli.Commands {
return []*cli.Command{
// start this service
Server(cfg),
// interaction with this service
// infos about this service
Health(cfg),
Version(cfg),
}
}
// Execute is the entry point for the ocis-store command.
func Execute(cfg *config.Config) error {
app := clihelper.DefaultApp(&cli.App{
Name: "store",
Usage: "Service to store values for ocis services",
Commands: GetCommands(cfg),
})
cli.HelpFlag = &cli.BoolFlag{
Name: "help,h",
Usage: "Show the help",
}
return app.Run(os.Args)
}
// SutureService allows for the store command to be embedded and supervised by a suture supervisor tree.
type SutureService struct {
cfg *config.Config
}
// NewSutureService creates a new store.SutureService
func NewSutureService(cfg *ociscfg.Config) suture.Service {
cfg.Store.Commons = cfg.Commons
return SutureService{
cfg: cfg.Store,
}
}
func (s SutureService) Serve(ctx context.Context) error {
s.cfg.Context = ctx
if err := Execute(s.cfg); err != nil {
return err
}
return nil
}

View File

@@ -0,0 +1,95 @@
package command
import (
"context"
"fmt"
"os"
"github.com/oklog/run"
"github.com/owncloud/ocis/v2/ocis-pkg/version"
"github.com/owncloud/ocis/v2/services/store/pkg/config"
"github.com/owncloud/ocis/v2/services/store/pkg/config/parser"
"github.com/owncloud/ocis/v2/services/store/pkg/logging"
"github.com/owncloud/ocis/v2/services/store/pkg/metrics"
"github.com/owncloud/ocis/v2/services/store/pkg/server/debug"
"github.com/owncloud/ocis/v2/services/store/pkg/server/grpc"
"github.com/owncloud/ocis/v2/services/store/pkg/tracing"
"github.com/urfave/cli/v2"
)
// Server is the entrypoint for the server command.
func Server(cfg *config.Config) *cli.Command {
return &cli.Command{
Name: "server",
Usage: fmt.Sprintf("start %s extension without runtime (unsupervised mode)", cfg.Service.Name),
Category: "server",
Before: func(c *cli.Context) error {
err := parser.ParseConfig(cfg)
if err != nil {
fmt.Printf("%v", err)
os.Exit(1)
}
return err
},
Action: func(c *cli.Context) error {
logger := logging.Configure(cfg.Service.Name, cfg.Log)
err := tracing.Configure(cfg)
if err != nil {
return err
}
var (
gr = run.Group{}
ctx, cancel = func() (context.Context, context.CancelFunc) {
if cfg.Context == nil {
return context.WithCancel(context.Background())
}
return context.WithCancel(cfg.Context)
}()
metrics = metrics.New()
)
defer cancel()
metrics.BuildInfo.WithLabelValues(version.GetString()).Set(1)
{
server := grpc.Server(
grpc.Logger(logger),
grpc.Context(ctx),
grpc.Config(cfg),
grpc.Metrics(metrics),
)
gr.Add(server.Run, func(_ error) {
logger.Info().
Str("server", "grpc").
Msg("Shutting down server")
cancel()
})
}
{
server, err := debug.Server(
debug.Logger(logger),
debug.Context(ctx),
debug.Config(cfg),
)
if err != nil {
logger.Error().Err(err).Str("server", "debug").Msg("Failed to initialize server")
return err
}
gr.Add(server.ListenAndServe, func(_ error) {
_ = server.Shutdown(ctx)
cancel()
})
}
return gr.Run()
},
}
}

View File

@@ -0,0 +1,50 @@
package command
import (
"fmt"
"os"
"github.com/owncloud/ocis/v2/ocis-pkg/registry"
"github.com/owncloud/ocis/v2/ocis-pkg/version"
tw "github.com/olekukonko/tablewriter"
"github.com/owncloud/ocis/v2/services/store/pkg/config"
"github.com/urfave/cli/v2"
)
// Version prints the service versions of all running instances.
func Version(cfg *config.Config) *cli.Command {
return &cli.Command{
Name: "version",
Usage: "print the version of this binary and the running extension instances",
Category: "info",
Action: func(c *cli.Context) error {
fmt.Println("Version: " + version.GetString())
fmt.Printf("Compiled: %s\n", version.Compiled())
fmt.Println("")
reg := registry.GetRegistry()
services, err := reg.GetService(cfg.GRPC.Namespace + "." + cfg.Service.Name)
if err != nil {
fmt.Println(fmt.Errorf("could not get %s services from the registry: %v", cfg.Service.Name, err))
return err
}
if len(services) == 0 {
fmt.Println("No running " + cfg.Service.Name + " service found.")
return nil
}
table := tw.NewWriter(os.Stdout)
table.SetHeader([]string{"Version", "Address", "Id"})
table.SetAutoFormatHeaders(false)
for _, s := range services {
for _, n := range s.Nodes {
table.Append([]string{s.Version, n.Address, n.Id})
}
}
table.Render()
return nil
},
}
}

View File

@@ -0,0 +1,24 @@
package config
import (
"context"
"github.com/owncloud/ocis/v2/ocis-pkg/shared"
)
// Config combines all available configuration parts.
type Config struct {
Commons *shared.Commons `yaml:"-"` // don't use this directly as configuration for a service
Service Service `yaml:"-"`
Tracing *Tracing `yaml:"tracing"`
Log *Log `yaml:"log"`
Debug Debug `yaml:"debug"`
GRPC GRPC `yaml:"grpc"`
Datapath string `yaml:"data_path" env:"STORE_DATA_PATH" desc:"Path for the store persistence directory."`
Context context.Context `yaml:"-"`
}

View File

@@ -0,0 +1,9 @@
package config
// Debug defines the available debug configuration.
type Debug struct {
Addr string `yaml:"addr" env:"STORE_DEBUG_ADDR" desc:"Bind address of the debug server, where metrics, health, config and debug endpoints will be exposed."`
Token string `yaml:"token" env:"STORE_DEBUG_TOKEN" desc:"Token to secure the metrics endpoint"`
Pprof bool `yaml:"pprof" env:"STORE_DEBUG_PPROF" desc:"Enables pprof, which can be used for profiling"`
Zpages bool `yaml:"zpages" env:"STORE_DEBUG_ZPAGES" desc:"Enables zpages, which can be used for collecting and viewing in-memory traces."`
}

View File

@@ -0,0 +1,63 @@
package defaults
import (
"path"
"github.com/owncloud/ocis/v2/ocis-pkg/config/defaults"
"github.com/owncloud/ocis/v2/services/store/pkg/config"
)
func FullDefaultConfig() *config.Config {
cfg := DefaultConfig()
EnsureDefaults(cfg)
Sanitize(cfg)
return cfg
}
func DefaultConfig() *config.Config {
return &config.Config{
Debug: config.Debug{
Addr: "127.0.0.1:9464",
Token: "",
Pprof: false,
Zpages: false,
},
GRPC: config.GRPC{
Addr: "127.0.0.1:9460",
Namespace: "com.owncloud.api",
},
Service: config.Service{
Name: "store",
},
Datapath: path.Join(defaults.BaseDataPath(), "store"),
}
}
func EnsureDefaults(cfg *config.Config) {
// provide with defaults for shared logging, since we need a valid destination address for BindEnv.
if cfg.Log == nil && cfg.Commons != nil && cfg.Commons.Log != nil {
cfg.Log = &config.Log{
Level: cfg.Commons.Log.Level,
Pretty: cfg.Commons.Log.Pretty,
Color: cfg.Commons.Log.Color,
File: cfg.Commons.Log.File,
}
} else if cfg.Log == nil {
cfg.Log = &config.Log{}
}
// provide with defaults for shared tracing, since we need a valid destination address for BindEnv.
if cfg.Tracing == nil && cfg.Commons != nil && cfg.Commons.Tracing != nil {
cfg.Tracing = &config.Tracing{
Enabled: cfg.Commons.Tracing.Enabled,
Type: cfg.Commons.Tracing.Type,
Endpoint: cfg.Commons.Tracing.Endpoint,
Collector: cfg.Commons.Tracing.Collector,
}
} else if cfg.Tracing == nil {
cfg.Tracing = &config.Tracing{}
}
}
func Sanitize(cfg *config.Config) {
// nothing to sanitize here atm
}

View File

@@ -0,0 +1,7 @@
package config
// GRPC defines the available grpc configuration.
type GRPC struct {
Addr string `yaml:"addr" env:"STORE_GRPC_ADDR" desc:"The bind address of the GRPC service."`
Namespace string `yaml:"-"`
}

View File

@@ -0,0 +1,9 @@
package config
// Log defines the available log configuration.
type Log struct {
Level string `mapstructure:"level" env:"OCIS_LOG_LEVEL;STORE_LOG_LEVEL" desc:"The log level. Valid values are: \"panic\", \"fatal\", \"error\", \"warn\", \"info\", \"debug\", \"trace\"."`
Pretty bool `mapstructure:"pretty" env:"OCIS_LOG_PRETTY;STORE_LOG_PRETTY" desc:"Activates pretty log output."`
Color bool `mapstructure:"color" env:"OCIS_LOG_COLOR;STORE_LOG_COLOR" desc:"Activates colorized log output."`
File string `mapstructure:"file" env:"OCIS_LOG_FILE;STORE_LOG_FILE" desc:"The path to the log file. Activates logging to this file if set."`
}

View File

@@ -0,0 +1,38 @@
package parser
import (
"errors"
ociscfg "github.com/owncloud/ocis/v2/ocis-pkg/config"
"github.com/owncloud/ocis/v2/services/store/pkg/config"
"github.com/owncloud/ocis/v2/services/store/pkg/config/defaults"
"github.com/owncloud/ocis/v2/ocis-pkg/config/envdecode"
)
// ParseConfig loads configuration from known paths.
func ParseConfig(cfg *config.Config) error {
_, err := ociscfg.BindSourcesToStructs(cfg.Service.Name, cfg)
if err != nil {
return err
}
defaults.EnsureDefaults(cfg)
// load all env variables relevant to the config in the current context.
if err := envdecode.Decode(cfg); err != nil {
// no environment variable set for this config is an expected "error"
if !errors.Is(err, envdecode.ErrNoTargetFieldsAreSet) {
return err
}
}
// sanitize config
defaults.Sanitize(cfg)
return Validate(cfg)
}
func Validate(cfg *config.Config) error {
return nil
}

View File

@@ -0,0 +1,6 @@
package config
// Service defines the available service configuration.
type Service struct {
Name string `yaml:"-"`
}

View File

@@ -0,0 +1,9 @@
package config
// Tracing defines the available tracing configuration.
type Tracing struct {
Enabled bool `yaml:"enabled" env:"OCIS_TRACING_ENABLED;STORE_TRACING_ENABLED" desc:"Activates tracing."`
Type string `yaml:"type" env:"OCIS_TRACING_TYPE;STORE_TRACING_TYPE" desc:"The type of tracing. Defaults to \"\", which is the same as \"jaeger\". Allowed tracing types are \"jaeger\" and \"\" as of now."`
Endpoint string `yaml:"endpoint" env:"OCIS_TRACING_ENDPOINT;STORE_TRACING_ENDPOINT" desc:"The endpoint of the tracing agent."`
Collector string `yaml:"collector" env:"OCIS_TRACING_COLLECTOR;STORE_TRACING_COLLECTOR" desc:"The HTTP endpoint for sending spans directly to a collector, i.e. http://jaeger-collector:14268/api/traces. Only used if the tracing endpoint is unset."`
}

View File

@@ -0,0 +1,17 @@
package logging
import (
"github.com/owncloud/ocis/v2/ocis-pkg/log"
"github.com/owncloud/ocis/v2/services/store/pkg/config"
)
// LoggerFromConfig initializes a service-specific logger instance.
func Configure(name string, cfg *config.Log) log.Logger {
return log.NewLogger(
log.Name(name),
log.Level(cfg.Level),
log.Pretty(cfg.Pretty),
log.Color(cfg.Color),
log.File(cfg.File),
)
}

View File

@@ -0,0 +1,45 @@
package metrics
import "github.com/prometheus/client_golang/prometheus"
var (
// Namespace defines the namespace for the defines metrics.
Namespace = "ocis"
// Subsystem defines the subsystem for the defines metrics.
Subsystem = "store"
)
// Metrics defines the available metrics of this service.
type Metrics struct {
// Counter *prometheus.CounterVec
BuildInfo *prometheus.GaugeVec
}
// New initializes the available metrics.
func New() *Metrics {
m := &Metrics{
// Counter: prometheus.NewCounterVec(prometheus.CounterOpts{
// Namespace: Namespace,
// Subsystem: Subsystem,
// Name: "greet_total",
// Help: "How many greeting requests processed",
// }, []string{}),
BuildInfo: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: Namespace,
Subsystem: Subsystem,
Name: "build_info",
Help: "Build Information",
}, []string{"version"}),
}
// prometheus.Register(
// m.Counter,
// )
_ = prometheus.Register(
m.BuildInfo,
)
return m
}

View File

@@ -0,0 +1,50 @@
package debug
import (
"context"
"github.com/owncloud/ocis/v2/ocis-pkg/log"
"github.com/owncloud/ocis/v2/services/store/pkg/config"
)
// Option defines a single option function.
type Option func(o *Options)
// Options defines the available options for this package.
type Options struct {
Logger log.Logger
Context context.Context
Config *config.Config
}
// newOptions initializes the available default options.
func newOptions(opts ...Option) Options {
opt := Options{}
for _, o := range opts {
o(&opt)
}
return opt
}
// Logger provides a function to set the logger option.
func Logger(val log.Logger) Option {
return func(o *Options) {
o.Logger = val
}
}
// Context provides a function to set the context option.
func Context(val context.Context) Option {
return func(o *Options) {
o.Context = val
}
}
// Config provides a function to set the config option.
func Config(val *config.Config) Option {
return func(o *Options) {
o.Config = val
}
}

View File

@@ -0,0 +1,59 @@
package debug
import (
"io"
"net/http"
"github.com/owncloud/ocis/v2/ocis-pkg/service/debug"
"github.com/owncloud/ocis/v2/ocis-pkg/version"
"github.com/owncloud/ocis/v2/services/store/pkg/config"
)
// Server initializes the debug service and server.
func Server(opts ...Option) (*http.Server, error) {
options := newOptions(opts...)
return debug.NewService(
debug.Logger(options.Logger),
debug.Name(options.Config.Service.Name),
debug.Version(version.GetString()),
debug.Address(options.Config.Debug.Addr),
debug.Token(options.Config.Debug.Token),
debug.Pprof(options.Config.Debug.Pprof),
debug.Zpages(options.Config.Debug.Zpages),
debug.Health(health(options.Config)),
debug.Ready(ready(options.Config)),
), nil
}
// health implements the health check.
func health(cfg *config.Config) func(http.ResponseWriter, *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/plain")
w.WriteHeader(http.StatusOK)
// TODO: check if services are up and running
_, err := io.WriteString(w, http.StatusText(http.StatusOK))
// io.WriteString should not fail but if it does we want to know.
if err != nil {
panic(err)
}
}
}
// ready implements the ready check.
func ready(cfg *config.Config) func(http.ResponseWriter, *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/plain")
w.WriteHeader(http.StatusOK)
// TODO: check if services are up and running
_, err := io.WriteString(w, http.StatusText(http.StatusOK))
// io.WriteString should not fail but if it does we want to know.
if err != nil {
panic(err)
}
}
}

View File

@@ -0,0 +1,76 @@
package grpc
import (
"context"
"github.com/owncloud/ocis/v2/ocis-pkg/log"
"github.com/owncloud/ocis/v2/services/store/pkg/config"
"github.com/owncloud/ocis/v2/services/store/pkg/metrics"
"github.com/urfave/cli/v2"
)
// Option defines a single option function.
type Option func(o *Options)
// Options defines the available options for this package.
type Options struct {
Name string
Logger log.Logger
Context context.Context
Config *config.Config
Metrics *metrics.Metrics
Flags []cli.Flag
}
// newOptions initializes the available default options.
func newOptions(opts ...Option) Options {
opt := Options{}
for _, o := range opts {
o(&opt)
}
return opt
}
// Name provides a name for the service.
func Name(val string) Option {
return func(o *Options) {
o.Name = val
}
}
// Logger provides a function to set the logger option.
func Logger(val log.Logger) Option {
return func(o *Options) {
o.Logger = val
}
}
// Context provides a function to set the context option.
func Context(val context.Context) Option {
return func(o *Options) {
o.Context = val
}
}
// Config provides a function to set the config option.
func Config(val *config.Config) Option {
return func(o *Options) {
o.Config = val
}
}
// Metrics provides a function to set the metrics option.
func Metrics(val *metrics.Metrics) Option {
return func(o *Options) {
o.Metrics = val
}
}
// Flags provides a function to set the flags option.
func Flags(val []cli.Flag) Option {
return func(o *Options) {
o.Flags = append(o.Flags, val...)
}
}

View File

@@ -0,0 +1,36 @@
package grpc
import (
"github.com/owncloud/ocis/v2/ocis-pkg/service/grpc"
"github.com/owncloud/ocis/v2/ocis-pkg/version"
storesvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/store/v0"
svc "github.com/owncloud/ocis/v2/services/store/pkg/service/v0"
)
// Server initializes a new go-micro service ready to run
func Server(opts ...Option) grpc.Service {
options := newOptions(opts...)
service := grpc.NewService(
grpc.Namespace(options.Config.GRPC.Namespace),
grpc.Name(options.Config.Service.Name),
grpc.Version(version.GetString()),
grpc.Context(options.Context),
grpc.Address(options.Config.GRPC.Addr),
grpc.Logger(options.Logger),
grpc.Flags(options.Flags...),
)
hdlr, err := svc.New(
svc.Logger(options.Logger),
svc.Config(options.Config),
)
if err != nil {
options.Logger.Fatal().Err(err).Msg("could not initialize service handler")
}
if err = storesvc.RegisterStoreHandler(service.Server(), hdlr); err != nil {
options.Logger.Fatal().Err(err).Msg("could not register service handler")
}
return service
}

View File

@@ -0,0 +1,63 @@
package service
import (
"github.com/owncloud/ocis/v2/ocis-pkg/log"
"github.com/owncloud/ocis/v2/services/store/pkg/config"
)
// Option defines a single option function.
type Option func(o *Options)
// Options defines the available options for this package.
type Options struct {
Logger log.Logger
Config *config.Config
Database, Table string
Nodes []string
}
func newOptions(opts ...Option) Options {
opt := Options{}
for _, o := range opts {
o(&opt)
}
return opt
}
// Logger provides a function to set the logger option.
func Logger(val log.Logger) Option {
return func(o *Options) {
o.Logger = val
}
}
// Database configures the database option.
func Database(val *config.Config) Option {
return func(o *Options) {
o.Config = val
}
}
// Table configures the Table option.
func Table(val *config.Config) Option {
return func(o *Options) {
o.Config = val
}
}
// Nodes configures the Nodes option.
func Nodes(val *config.Config) Option {
return func(o *Options) {
o.Config = val
}
}
// Config configures the Config option.
func Config(val *config.Config) Option {
return func(o *Options) {
o.Config = val
}
}

View File

@@ -0,0 +1,333 @@
package service
import (
"context"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"github.com/blevesearch/bleve/v2"
"github.com/blevesearch/bleve/v2/analysis/analyzer/keyword"
"github.com/owncloud/ocis/v2/ocis-pkg/log"
storemsg "github.com/owncloud/ocis/v2/protogen/gen/ocis/messages/store/v0"
storesvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/store/v0"
"github.com/owncloud/ocis/v2/services/store/pkg/config"
merrors "go-micro.dev/v4/errors"
"google.golang.org/protobuf/encoding/protojson"
)
// BleveDocument wraps the generated Record.Metadata and adds a property that is used to distinguish documents in the index.
type BleveDocument struct {
Metadata map[string]*storemsg.Field `json:"metadata"`
Database string `json:"database"`
Table string `json:"table"`
}
// New returns a new instance of Service
func New(opts ...Option) (s *Service, err error) {
options := newOptions(opts...)
logger := options.Logger
cfg := options.Config
recordsDir := filepath.Join(cfg.Datapath, "databases")
{
var fi os.FileInfo
if fi, err = os.Stat(recordsDir); err != nil {
if os.IsNotExist(err) {
// create store directory
if err = os.MkdirAll(recordsDir, 0700); err != nil {
return nil, err
}
}
} else if !fi.IsDir() {
return nil, fmt.Errorf("%s is not a directory", recordsDir)
}
}
indexMapping := bleve.NewIndexMapping()
// keep all symbols in terms to allow exact matching, eg. emails
indexMapping.DefaultAnalyzer = keyword.Name
s = &Service{
id: cfg.GRPC.Namespace + "." + cfg.Service.Name,
log: logger,
Config: cfg,
}
indexDir := filepath.Join(cfg.Datapath, "index.bleve")
// for now recreate index on every start
if err = os.RemoveAll(indexDir); err != nil {
return nil, err
}
if s.index, err = bleve.New(indexDir, indexMapping); err != nil {
return
}
if err = s.indexRecords(recordsDir); err != nil {
return nil, err
}
return
}
// Service implements the AccountsServiceHandler interface
type Service struct {
id string
log log.Logger
Config *config.Config
index bleve.Index
}
// Read implements the StoreHandler interface.
func (s *Service) Read(c context.Context, rreq *storesvc.ReadRequest, rres *storesvc.ReadResponse) error {
if len(rreq.Key) != 0 {
id := getID(rreq.Options.Database, rreq.Options.Table, rreq.Key)
file := filepath.Join(s.Config.Datapath, "databases", id)
var data []byte
rec := &storemsg.Record{}
data, err := ioutil.ReadFile(file)
if err != nil {
return merrors.NotFound(s.id, "could not read record")
}
if err = protojson.Unmarshal(data, rec); err != nil {
return merrors.InternalServerError(s.id, "could not unmarshal record")
}
rres.Records = append(rres.Records, rec)
return nil
}
s.log.Info().Interface("request", rreq).Msg("read request")
if rreq.Options.Where != nil {
// build bleve query
// execute search
// fetch the actual record if there's a hit
dtq := bleve.NewTermQuery(rreq.Options.Database)
ttq := bleve.NewTermQuery(rreq.Options.Table)
dtq.SetField("database")
ttq.SetField("table")
query := bleve.NewConjunctionQuery(dtq, ttq)
for k, v := range rreq.Options.Where {
ntq := bleve.NewTermQuery(v.Value)
ntq.SetField("metadata." + k + ".value")
query.AddQuery(ntq)
}
searchRequest := bleve.NewSearchRequest(query)
var searchResult *bleve.SearchResult
searchResult, err := s.index.Search(searchRequest)
if err != nil {
s.log.Error().Err(err).Msg("could not execute bleve search")
return merrors.InternalServerError(s.id, "could not execute bleve search: %v", err.Error())
}
for _, hit := range searchResult.Hits {
rec := &storemsg.Record{}
dest := filepath.Join(s.Config.Datapath, "databases", hit.ID)
var data []byte
data, err := ioutil.ReadFile(dest)
s.log.Info().Str("path", dest).Interface("hit", hit).Msgf("hit info")
if err != nil {
s.log.Info().Str("path", dest).Interface("hit", hit).Msgf("file not found")
return merrors.NotFound(s.id, "could not read record")
}
if err = protojson.Unmarshal(data, rec); err != nil {
return merrors.InternalServerError(s.id, "could not unmarshal record")
}
rres.Records = append(rres.Records, rec)
}
return nil
}
return merrors.InternalServerError(s.id, "neither id nor metadata present")
}
// Write implements the StoreHandler interface.
func (s *Service) Write(c context.Context, wreq *storesvc.WriteRequest, wres *storesvc.WriteResponse) error {
id := getID(wreq.Options.Database, wreq.Options.Table, wreq.Record.Key)
file := filepath.Join(s.Config.Datapath, "databases", id)
var bytes []byte
bytes, err := protojson.Marshal(wreq.Record)
if err != nil {
return merrors.InternalServerError(s.id, "could not marshal record")
}
err = os.MkdirAll(filepath.Dir(file), 0700)
if err != nil {
return err
}
err = ioutil.WriteFile(file, bytes, 0600)
if err != nil {
return merrors.InternalServerError(s.id, "could not write record")
}
doc := BleveDocument{
Metadata: wreq.Record.Metadata,
Database: wreq.Options.Database,
Table: wreq.Options.Table,
}
if err := s.index.Index(id, doc); err != nil {
s.log.Error().Err(err).Interface("document", doc).Msg("could not index record metadata")
return err
}
return nil
}
// Delete implements the StoreHandler interface.
func (s *Service) Delete(c context.Context, dreq *storesvc.DeleteRequest, dres *storesvc.DeleteResponse) error {
id := getID(dreq.Options.Database, dreq.Options.Table, dreq.Key)
file := filepath.Join(s.Config.Datapath, "databases", id)
if err := os.Remove(file); err != nil {
if os.IsNotExist(err) {
return merrors.NotFound(s.id, "could not find record")
}
return merrors.InternalServerError(s.id, "could not delete record")
}
if err := s.index.Delete(id); err != nil {
s.log.Error().Err(err).Str("id", id).Msg("could not remove record from index")
return merrors.InternalServerError(s.id, "could not remove record from index")
}
return nil
}
// List implements the StoreHandler interface.
func (s *Service) List(context.Context, *storesvc.ListRequest, storesvc.Store_ListStream) error {
return nil
}
// Databases implements the StoreHandler interface.
func (s *Service) Databases(c context.Context, dbreq *storesvc.DatabasesRequest, dbres *storesvc.DatabasesResponse) error {
file := filepath.Join(s.Config.Datapath, "databases")
f, err := os.Open(file)
if err != nil {
return merrors.InternalServerError(s.id, "could not open database directory")
}
defer f.Close()
dnames, err := f.Readdirnames(0)
if err != nil {
return merrors.InternalServerError(s.id, "could not read database directory")
}
dbres.Databases = dnames
return nil
}
// Tables implements the StoreHandler interface.
func (s *Service) Tables(ctx context.Context, in *storesvc.TablesRequest, out *storesvc.TablesResponse) error {
file := filepath.Join(s.Config.Datapath, "databases", in.Database)
f, err := os.Open(file)
if err != nil {
return merrors.InternalServerError(s.id, "could not open tables directory")
}
defer f.Close()
tnames, err := f.Readdirnames(0)
if err != nil {
return merrors.InternalServerError(s.id, "could not read tables directory")
}
out.Tables = tnames
return nil
}
// TODO sanitize key. As it may contain invalid characters, such as slashes.
// file: /tmp/ocis-store/databases/{database}/{table}/{record.key}.
func getID(database string, table string, key string) string {
// TODO sanitize input.
return filepath.Join(database, table, key)
}
func (s Service) indexRecords(recordsDir string) (err error) {
// TODO use filepath.Walk to clean up code
rh, err := os.Open(recordsDir)
if err != nil {
return merrors.InternalServerError(s.id, "could not open database directory")
}
defer rh.Close()
dbs, err := rh.Readdirnames(0)
if err != nil {
return merrors.InternalServerError(s.id, "could not read databases directory")
}
for i := range dbs {
tp := filepath.Join(s.Config.Datapath, "databases", dbs[i])
th, err := os.Open(tp)
if err != nil {
s.log.Error().Err(err).Str("database", dbs[i]).Msg("could not open database directory")
continue
}
defer th.Close()
tables, err := th.Readdirnames(0)
if err != nil {
s.log.Error().Err(err).Str("database", dbs[i]).Msg("could not read database directory")
continue
}
for j := range tables {
tp := filepath.Join(s.Config.Datapath, "databases", dbs[i], tables[j])
kh, err := os.Open(tp)
if err != nil {
s.log.Error().Err(err).Str("database", dbs[i]).Str("table", tables[i]).Msg("could not open table directory")
continue
}
defer kh.Close()
keys, err := kh.Readdirnames(0)
if err != nil {
s.log.Error().Err(err).Str("database", dbs[i]).Str("table", tables[i]).Msg("could not read table directory")
continue
}
for k := range keys {
id := getID(dbs[i], tables[j], keys[k])
kp := filepath.Join(s.Config.Datapath, "databases", id)
// read record
var data []byte
rec := &storemsg.Record{}
data, err = ioutil.ReadFile(kp)
if err != nil {
s.log.Error().Err(err).Str("id", id).Msg("could not read record")
continue
}
if err = protojson.Unmarshal(data, rec); err != nil {
s.log.Error().Err(err).Str("id", id).Msg("could not unmarshal record")
continue
}
// index record
doc := BleveDocument{
Metadata: rec.Metadata,
Database: dbs[i],
Table: tables[j],
}
if err := s.index.Index(id, doc); err != nil {
s.log.Error().Err(err).Interface("document", doc).Str("id", id).Msg("could not index record metadata")
continue
}
s.log.Debug().Str("id", id).Msg("indexed record")
}
}
}
return
}

View File

@@ -0,0 +1,23 @@
package tracing
import (
pkgtrace "github.com/owncloud/ocis/v2/ocis-pkg/tracing"
"github.com/owncloud/ocis/v2/services/store/pkg/config"
"go.opentelemetry.io/otel/trace"
)
var (
// TraceProvider is the global trace provider for the store service.
TraceProvider = trace.NewNoopTracerProvider()
)
func Configure(cfg *config.Config) error {
var err error
if cfg.Tracing.Enabled {
if TraceProvider, err = pkgtrace.GetTraceProvider(cfg.Tracing.Endpoint, cfg.Tracing.Collector, cfg.Service.Name, cfg.Tracing.Type); err != nil {
return err
}
}
return nil
}

View File

@@ -0,0 +1,3 @@
# backend
-r '^(cmd|pkg)/.*\.go$' -R '^node_modules/' -s -- sh -c 'make bin/ocis-store-debug && bin/ocis-store-debug --log-level debug server --debug-pprof --debug-zpages'
'