mirror of
https://github.com/opencloud-eu/opencloud.git
synced 2026-01-07 04:40:05 -06:00
add storage metadata to the runtime and fix its config parsing when running on supervised mode
This commit is contained in:
@@ -1,13 +1,6 @@
|
||||
package command
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/owncloud/ocis/ocis/pkg/runtime"
|
||||
"log"
|
||||
"net"
|
||||
"net/rpc"
|
||||
"os"
|
||||
|
||||
cli "github.com/micro/cli/v2"
|
||||
|
||||
"github.com/owncloud/ocis/ocis/pkg/config"
|
||||
@@ -35,13 +28,14 @@ func RunCommand(cfg *config.Config) *cli.Command {
|
||||
},
|
||||
},
|
||||
Action: func(c *cli.Context) error {
|
||||
client, err := rpc.DialHTTP("tcp", net.JoinHostPort(cfg.Runtime.Hostname, cfg.Runtime.Port))
|
||||
if err != nil {
|
||||
log.Fatal("dialing:", err)
|
||||
}
|
||||
|
||||
res := runtime.RunService(client, os.Args[2])
|
||||
fmt.Println(res)
|
||||
// TODO(refs) this implementation changes as we don't depend on os threads anymore.
|
||||
//client, err := rpc.DialHTTP("tcp", net.JoinHostPort(cfg.Runtime.Hostname, cfg.Runtime.Port))
|
||||
//if err != nil {
|
||||
// log.Fatal("dialing:", err)
|
||||
//}
|
||||
//
|
||||
//res := runtime.RunService(client, os.Args[2])
|
||||
//fmt.Println(res)
|
||||
return nil
|
||||
},
|
||||
}
|
||||
|
||||
@@ -2,12 +2,10 @@ package runtime
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
golog "log"
|
||||
"net/rpc"
|
||||
"os"
|
||||
"os/signal"
|
||||
"time"
|
||||
|
||||
settings "github.com/owncloud/ocis/settings/pkg/command"
|
||||
|
||||
"github.com/thejerf/suture"
|
||||
|
||||
@@ -18,8 +16,7 @@ import (
|
||||
|
||||
"github.com/owncloud/ocis/ocis/pkg/config"
|
||||
|
||||
"github.com/owncloud/ocis/ocis/pkg/runtime/process"
|
||||
settings "github.com/owncloud/ocis/settings/pkg/command"
|
||||
storage "github.com/owncloud/ocis/storage/pkg/command"
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -96,15 +93,27 @@ func (r *Runtime) Start() error {
|
||||
// - to avoid this getting out of hands, a supervisor would need to be injected on each supervised service.
|
||||
// - each service would then add its execute func to the supervisor, and return its token (?)
|
||||
// - this runtime should only care about start / stop services, for that we use serviceTokens.
|
||||
// - normalize the use of panics so that suture can restart services that die.
|
||||
// - shutting down a service implies iterating over all the serviceToken for the given service name and terminating them.
|
||||
// - config file parsing with Viper is no longer possible as viper is not thread-safe (https://github.com/spf13/viper/issues/19)
|
||||
// - replace occurrences of log.Fatal in favor of panic() since the supervisor relies on panics.
|
||||
|
||||
// propagate reva log config to storage services
|
||||
r.c.Storage.Log.Level = r.c.Log.Level
|
||||
r.c.Storage.Log.Color = r.c.Log.Color
|
||||
r.c.Storage.Log.Pretty = r.c.Log.Pretty
|
||||
|
||||
tokens["settings"] = append(tokens["settings"], supervisor.Add(settings.NewSutureService(globalCtx, r.c.Settings)))
|
||||
tokens["storagemetadata"] = append(tokens["storagemetadata"], supervisor.Add(storage.NewStorageMetadata(globalCtx, r.c.Storage)))
|
||||
|
||||
go supervisor.ServeBackground()
|
||||
|
||||
<-halt
|
||||
globalCancel()
|
||||
close(halt)
|
||||
return nil
|
||||
select {
|
||||
case <-halt:
|
||||
globalCancel()
|
||||
close(halt)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// for logging reasons we don't want the same logging level on both oCIS and micro. As a framework builder we do not
|
||||
@@ -120,61 +129,3 @@ func setMicroLogger(log config.Log) {
|
||||
}
|
||||
logger.DefaultLogger = mzlog.NewLogger(logger.WithLevel(logger.Level(lev)))
|
||||
}
|
||||
|
||||
// Launch oCIS default oCIS extensions.
|
||||
func (r *Runtime) Launch() {
|
||||
var client *rpc.Client
|
||||
var err error
|
||||
var try int
|
||||
|
||||
for {
|
||||
if try >= maxRetries {
|
||||
golog.Fatal("could not get a connection to rpc runtime on localhost:10666")
|
||||
}
|
||||
client, err = rpc.DialHTTP("tcp", "localhost:10666")
|
||||
if err != nil {
|
||||
try++
|
||||
fmt.Println("runtime not available, retrying...")
|
||||
time.Sleep(1 * time.Second)
|
||||
} else {
|
||||
goto OUT
|
||||
}
|
||||
}
|
||||
|
||||
OUT:
|
||||
for _, v := range Extensions {
|
||||
RunService(client, v)
|
||||
}
|
||||
|
||||
if len(dependants) > 0 {
|
||||
time.Sleep(2 * time.Second)
|
||||
for _, v := range dependants {
|
||||
RunService(client, v)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// RunService sends a Service.Start command with the given service name to pman
|
||||
func RunService(client *rpc.Client, service string) int {
|
||||
args := process.NewProcEntry(service, os.Environ(), []string{service}...)
|
||||
|
||||
all := append(Extensions, dependants...)
|
||||
if !contains(all, service) {
|
||||
return 1
|
||||
}
|
||||
|
||||
var reply int
|
||||
if err := client.Call("Service.Start", args, &reply); err != nil {
|
||||
golog.Fatal(err)
|
||||
}
|
||||
return reply
|
||||
}
|
||||
|
||||
func contains(a []string, b string) bool {
|
||||
for i := range a {
|
||||
if a[i] == b {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
@@ -117,11 +117,11 @@ type SutureService struct {
|
||||
|
||||
// NewSutureService creates a new settings.SutureService
|
||||
func NewSutureService(ctx context.Context, cfg *config.Config) SutureService {
|
||||
sctx, scancel := context.WithCancel(ctx)
|
||||
sctx, cancel := context.WithCancel(ctx)
|
||||
cfg.Context = sctx // propagate the context down to the go-micro services.
|
||||
return SutureService{
|
||||
ctx: sctx,
|
||||
cancel: scancel,
|
||||
cancel: cancel,
|
||||
cfg: cfg,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,10 +4,11 @@ import (
|
||||
"os"
|
||||
|
||||
"github.com/owncloud/ocis/storage/pkg/command"
|
||||
"github.com/owncloud/ocis/storage/pkg/config"
|
||||
)
|
||||
|
||||
func main() {
|
||||
if err := command.Execute(); err != nil {
|
||||
if err := command.Execute(config.New()); err != nil {
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -13,9 +13,7 @@ import (
|
||||
)
|
||||
|
||||
// Execute is the entry point for the storage command.
|
||||
func Execute() error {
|
||||
cfg := config.New()
|
||||
|
||||
func Execute(cfg *config.Config) error {
|
||||
app := &cli.App{
|
||||
Name: "storage",
|
||||
Version: version.String,
|
||||
|
||||
@@ -2,6 +2,7 @@ package command
|
||||
|
||||
import (
|
||||
"context"
|
||||
"flag"
|
||||
"os"
|
||||
"os/signal"
|
||||
"path"
|
||||
@@ -23,9 +24,10 @@ import (
|
||||
// It provides a ocis-specific storage store metadata (shares,account,settings...)
|
||||
func StorageMetadata(cfg *config.Config) *cli.Command {
|
||||
return &cli.Command{
|
||||
Name: "storage-metadata",
|
||||
Usage: "Start storage-metadata service",
|
||||
Flags: flagset.StorageMetadata(cfg),
|
||||
Name: "storage-metadata",
|
||||
Usage: "Start storage-metadata service",
|
||||
// TODO(refs) at this point it might make sense delegate log flags to each individual storage command.
|
||||
Flags: append(flagset.StorageMetadata(cfg), flagset.RootWithConfig(cfg)...),
|
||||
Category: "Extensions",
|
||||
Before: func(c *cli.Context) error {
|
||||
storageRoot := c.String("storage-root")
|
||||
@@ -217,3 +219,43 @@ func StorageMetadata(cfg *config.Config) *cli.Command {
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// SutureService allows for the settings command to be embedded and supervised by a suture supervisor tree.
|
||||
type SutureService struct {
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc // used to cancel the context go-micro services used to shutdown a service.
|
||||
cfg *config.Config
|
||||
}
|
||||
|
||||
// NewSutureService creates a new storagemetadata.SutureService
|
||||
func NewStorageMetadata(ctx context.Context, cfg *config.Config) SutureService {
|
||||
sctx, cancel := context.WithCancel(ctx)
|
||||
cfg.Context = sctx
|
||||
return SutureService{
|
||||
ctx: sctx,
|
||||
cancel: cancel,
|
||||
cfg: cfg,
|
||||
}
|
||||
}
|
||||
|
||||
func (s SutureService) Serve() {
|
||||
f := &flag.FlagSet{}
|
||||
for k := range StorageMetadata(s.cfg).Flags {
|
||||
if err := StorageMetadata(s.cfg).Flags[k].Apply(f); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
ctx := cli.NewContext(nil, f, nil)
|
||||
if StorageMetadata(s.cfg).Before != nil {
|
||||
if err := StorageMetadata(s.cfg).Before(ctx); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
if err := StorageMetadata(s.cfg).Action(ctx); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
func (s SutureService) Stop() {
|
||||
s.cancel()
|
||||
}
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
package config
|
||||
|
||||
import "context"
|
||||
|
||||
// Log defines the available logging configuration.
|
||||
type Log struct {
|
||||
Level string
|
||||
@@ -376,6 +378,8 @@ type Config struct {
|
||||
Reva Reva
|
||||
Tracing Tracing
|
||||
Asset Asset
|
||||
|
||||
Context context.Context
|
||||
}
|
||||
|
||||
// New initializes a new configuration with or without defaults.
|
||||
|
||||
@@ -17,21 +17,20 @@ func RootWithConfig(cfg *config.Config) []cli.Flag {
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "log-level",
|
||||
Value: "info",
|
||||
Usage: "Set logging level",
|
||||
EnvVars: []string{"STORAGE_LOG_LEVEL"},
|
||||
EnvVars: []string{"STORAGE_LOG_LEVEL", "OCIS_LOG_LEVEL"},
|
||||
Destination: &cfg.Log.Level,
|
||||
},
|
||||
&cli.BoolFlag{
|
||||
Name: "log-pretty",
|
||||
Usage: "Enable pretty logging",
|
||||
EnvVars: []string{"STORAGE_LOG_PRETTY"},
|
||||
EnvVars: []string{"STORAGE_LOG_PRETTY", "OCIS_LOG_PRETTY"},
|
||||
Destination: &cfg.Log.Pretty,
|
||||
},
|
||||
&cli.BoolFlag{
|
||||
Name: "log-color",
|
||||
Usage: "Enable colored logging",
|
||||
EnvVars: []string{"STORAGE_LOG_COLOR"},
|
||||
EnvVars: []string{"STORAGE_LOG_COLOR", "OCIS_LOG_COLOR"},
|
||||
Destination: &cfg.Log.Color,
|
||||
},
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user