diff --git a/ocis/go.sum b/ocis/go.sum index 3a54a53270..8c1dc24ecd 100644 --- a/ocis/go.sum +++ b/ocis/go.sum @@ -1197,6 +1197,7 @@ github.com/ory/x v0.0.162/go.mod h1:sj3z/MeCrAyNFFTfN6yK1nTmHXGSFnw+QwIIQ/Rowec= github.com/ovh/go-ovh v0.0.0-20181109152953-ba5adb4cf014/go.mod h1:joRatxRJaZBsY3JAOEMcoOp05CnZzsx4scTxi95DHyQ= github.com/owncloud/ocis-hello v0.1.0-alpha1.0.20210204050952-c291e4c5b73f h1:yvJWb/wBKb0WLcdamuhND5jZNfh1SoFnql6RZ7pUStU= github.com/owncloud/ocis-hello v0.1.0-alpha1.0.20210204050952-c291e4c5b73f/go.mod h1:pCTUK78ioY3VPV0kMz3GzD6ZHL9E1R7+dMoMMBBA2eQ= +github.com/owncloud/ocis/ocis v0.0.0-20210305133343-0eda3b882b7c/go.mod h1:0PwZqflEkL3M6P0FXuIrEuHX9SJ0zA2MGYEjGAz0Aak= github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c h1:rp5dCmg/yLR3mgFuSOe4oEnDDmGLROTvMragMUXpTQw= github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c/go.mod h1:X07ZCGwUbLaax7L0S3Tw4hpejzu63ZrrQiUe6W0hcy0= github.com/parnurzeal/gorequest v0.2.15/go.mod h1:3Kh2QUMJoqw3icWAecsyzkpY7UzRfDhbRdTjtNwNiUE= diff --git a/ocis/pkg/command/kill.go b/ocis/pkg/command/kill.go index f87367c582..71690e0339 100644 --- a/ocis/pkg/command/kill.go +++ b/ocis/pkg/command/kill.go @@ -27,7 +27,7 @@ func KillCommand(cfg *config.Config) *cli.Command { }, &cli.StringFlag{ Name: "port", - Value: "10666", + Value: "6060", EnvVars: []string{"OCIS_RUNTIME_PORT"}, Destination: &cfg.Runtime.Port, }, diff --git a/ocis/pkg/command/run.go b/ocis/pkg/command/run.go index 2eb716e715..a658b13614 100644 --- a/ocis/pkg/command/run.go +++ b/ocis/pkg/command/run.go @@ -1,6 +1,11 @@ package command import ( + "fmt" + "log" + "net" + "net/rpc" + cli "github.com/micro/cli/v2" "github.com/owncloud/ocis/ocis/pkg/config" @@ -22,20 +27,25 @@ func RunCommand(cfg *config.Config) *cli.Command { }, &cli.StringFlag{ Name: "port", - Value: "10666", + Value: "6060", EnvVars: []string{"OCIS_RUNTIME_PORT"}, Destination: &cfg.Runtime.Port, }, }, Action: func(c *cli.Context) error { // TODO(refs) this implementation changes as we don't depend on os threads anymore. - //client, err := rpc.DialHTTP("tcp", net.JoinHostPort(cfg.Runtime.Hostname, cfg.Runtime.Port)) - //if err != nil { - // log.Fatal("dialing:", err) - //} - // - //res := runtime.RunService(client, os.Args[2]) - //fmt.Println(res) + client, err := rpc.DialHTTP("tcp", net.JoinHostPort(cfg.Runtime.Hostname, cfg.Runtime.Port)) + if err != nil { + log.Fatal("dialing:", err) + } + + var reply int + + if err := client.Call("Service.Start", "settings", &reply); err != nil { + log.Fatal(err) + } + fmt.Println(reply) + return nil }, } diff --git a/ocis/pkg/runtime/runtime.go b/ocis/pkg/runtime/runtime.go index 04b9063017..cacc12a662 100644 --- a/ocis/pkg/runtime/runtime.go +++ b/ocis/pkg/runtime/runtime.go @@ -1,25 +1,12 @@ package runtime import ( - "context" "os" - "os/signal" mzlog "github.com/asim/go-micro/plugins/logger/zerolog/v3" "github.com/asim/go-micro/v3/logger" - accounts "github.com/owncloud/ocis/accounts/pkg/command" - glauth "github.com/owncloud/ocis/glauth/pkg/command" - idp "github.com/owncloud/ocis/idp/pkg/command" "github.com/owncloud/ocis/ocis/pkg/config" - ocs "github.com/owncloud/ocis/ocs/pkg/command" - onlyoffice "github.com/owncloud/ocis/onlyoffice/pkg/command" - proxy "github.com/owncloud/ocis/proxy/pkg/command" - settings "github.com/owncloud/ocis/settings/pkg/command" - storage "github.com/owncloud/ocis/storage/pkg/command" - store "github.com/owncloud/ocis/store/pkg/command" - thumbnails "github.com/owncloud/ocis/thumbnails/pkg/command" - web "github.com/owncloud/ocis/web/pkg/command" - webdav "github.com/owncloud/ocis/webdav/pkg/command" + "github.com/owncloud/ocis/ocis/pkg/runtime/service" "github.com/rs/zerolog" "github.com/thejerf/suture" ) @@ -45,51 +32,58 @@ type serviceTokens map[string][]suture.ServiceToken var tokens = serviceTokens{} // Start rpc runtime +// TODO(refs) initialize a new Service struct +// - run the set of default services under the server command. func (r *Runtime) Start() error { - setMicroLogger(r.c.Log) - halt := make(chan os.Signal, 1) - signal.Notify(halt, os.Interrupt) + // 1. create new Service struct + // 2. start - supervisor := suture.NewSimple("ocis") - globalCtx, globalCancel := context.WithCancel(context.Background()) + return service.Start(service.WithConfig(r.c)) - // propagate reva log config to storage services - inheritedOptions := []storage.Option{ - storage.WithLogPretty(r.c.Log.Pretty), - storage.WithLogColor(r.c.Log.Color), - storage.WithLogLevel(r.c.Log.Level), - } - - addServiceToken("settings", supervisor.Add(settings.NewSutureService(globalCtx, r.c.Settings))) - addServiceToken("storage-metadata", supervisor.Add(storage.NewStorageMetadata(globalCtx, inheritedOptions...))) - addServiceToken("accounts", supervisor.Add(accounts.NewSutureService(globalCtx, r.c.Accounts))) - addServiceToken("glauth", supervisor.Add(glauth.NewSutureService(globalCtx, r.c.GLAuth))) - addServiceToken("idp", supervisor.Add(idp.NewSutureService(globalCtx, r.c.IDP))) - addServiceToken("ocs", supervisor.Add(ocs.NewSutureService(globalCtx, r.c.OCS))) - addServiceToken("onlyoffice", supervisor.Add(onlyoffice.NewSutureService(globalCtx, r.c.Onlyoffice))) - addServiceToken("proxy", supervisor.Add(proxy.NewSutureService(globalCtx, r.c.Proxy))) - addServiceToken("store", supervisor.Add(store.NewSutureService(globalCtx, r.c.Store))) - addServiceToken("thumbnails", supervisor.Add(thumbnails.NewSutureService(globalCtx, r.c.Thumbnails))) - addServiceToken("web", supervisor.Add(web.NewSutureService(globalCtx, r.c.Web))) - addServiceToken("webdav", supervisor.Add(webdav.NewSutureService(globalCtx, r.c.WebDAV))) - addServiceToken("storage-frontend", supervisor.Add(storage.NewFrontend(globalCtx, inheritedOptions...))) - addServiceToken("storage-gateway", supervisor.Add(storage.NewGateway(globalCtx, inheritedOptions...))) - addServiceToken("storage-users", supervisor.Add(storage.NewUsersProviderService(globalCtx, inheritedOptions...))) - addServiceToken("storage-groupsprovider", supervisor.Add(storage.NewGroupsProvider(globalCtx, inheritedOptions...))) // TODO(refs) panic? are we sending to a nil / closed channel? - addServiceToken("storage-authbasic", supervisor.Add(storage.NewAuthBasic(globalCtx, inheritedOptions...))) - addServiceToken("storage-authbearer", supervisor.Add(storage.NewAuthBearer(globalCtx, inheritedOptions...))) - addServiceToken("storage-home", supervisor.Add(storage.NewStorageHome(globalCtx, inheritedOptions...))) - addServiceToken("storage-users", supervisor.Add(storage.NewStorageUsers(globalCtx, inheritedOptions...))) - addServiceToken("storage-public-link", supervisor.Add(storage.NewStoragePublicLink(globalCtx, inheritedOptions...))) - addServiceToken("storage-sharing", supervisor.Add(storage.NewSharing(globalCtx, inheritedOptions...))) - - // TODO(refs) debug line with supervised services. - go supervisor.ServeBackground() - - <-halt - - globalCancel() - close(halt) + //setMicroLogger(r.c.Log) + //halt := make(chan os.Signal, 1) + //signal.Notify(halt, os.Interrupt) + // + //supervisor := suture.NewSimple("ocis") + //globalCtx, globalCancel := context.WithCancel(context.Background()) + // + //// propagate reva log config to storage services + //inheritedOptions := []storage.Option{ + // storage.WithLogPretty(r.c.Log.Pretty), + // storage.WithLogColor(r.c.Log.Color), + // storage.WithLogLevel(r.c.Log.Level), + //} + // + //addServiceToken("settings", supervisor.Add(settings.NewSutureService(globalCtx, r.c.Settings))) + //addServiceToken("storage-metadata", supervisor.Add(storage.NewStorageMetadata(globalCtx, inheritedOptions...))) + //addServiceToken("accounts", supervisor.Add(accounts.NewSutureService(globalCtx, r.c.Accounts))) + //addServiceToken("glauth", supervisor.Add(glauth.NewSutureService(globalCtx, r.c.GLAuth))) + //addServiceToken("idp", supervisor.Add(idp.NewSutureService(globalCtx, r.c.IDP))) + //addServiceToken("ocs", supervisor.Add(ocs.NewSutureService(globalCtx, r.c.OCS))) + //addServiceToken("onlyoffice", supervisor.Add(onlyoffice.NewSutureService(globalCtx, r.c.Onlyoffice))) + //addServiceToken("proxy", supervisor.Add(proxy.NewSutureService(globalCtx, r.c.Proxy))) + //addServiceToken("store", supervisor.Add(store.NewSutureService(globalCtx, r.c.Store))) + //addServiceToken("thumbnails", supervisor.Add(thumbnails.NewSutureService(globalCtx, r.c.Thumbnails))) + //addServiceToken("web", supervisor.Add(web.NewSutureService(globalCtx, r.c.Web))) + //addServiceToken("webdav", supervisor.Add(webdav.NewSutureService(globalCtx, r.c.WebDAV))) + //addServiceToken("storage-frontend", supervisor.Add(storage.NewFrontend(globalCtx, inheritedOptions...))) + //addServiceToken("storage-gateway", supervisor.Add(storage.NewGateway(globalCtx, inheritedOptions...))) + //addServiceToken("storage-users", supervisor.Add(storage.NewUsersProviderService(globalCtx, inheritedOptions...))) + //addServiceToken("storage-groupsprovider", supervisor.Add(storage.NewGroupsProvider(globalCtx, inheritedOptions...))) + //addServiceToken("storage-authbasic", supervisor.Add(storage.NewAuthBasic(globalCtx, inheritedOptions...))) + //addServiceToken("storage-authbearer", supervisor.Add(storage.NewAuthBearer(globalCtx, inheritedOptions...))) + //addServiceToken("storage-home", supervisor.Add(storage.NewStorageHome(globalCtx, inheritedOptions...))) + //addServiceToken("storage-users", supervisor.Add(storage.NewStorageUsers(globalCtx, inheritedOptions...))) + //addServiceToken("storage-public-link", supervisor.Add(storage.NewStoragePublicLink(globalCtx, inheritedOptions...))) + //addServiceToken("storage-sharing", supervisor.Add(storage.NewSharing(globalCtx, inheritedOptions...))) + // + //// TODO(refs) debug line with supervised services. + //go supervisor.ServeBackground() + // + //<-halt + // + //globalCancel() + //close(halt) return nil } diff --git a/ocis/pkg/runtime/service/option.go b/ocis/pkg/runtime/service/option.go index 5e8e88f532..e068181c20 100644 --- a/ocis/pkg/runtime/service/option.go +++ b/ocis/pkg/runtime/service/option.go @@ -1,5 +1,9 @@ package service +import ( + "github.com/owncloud/ocis/ocis/pkg/config" +) + // Log configures a structure logger. type Log struct { Pretty bool @@ -7,7 +11,8 @@ type Log struct { // Options are the configurable options for a Service. type Options struct { - Log *Log + Log *Log + Config *config.Config } // Option represents an option. @@ -26,3 +31,10 @@ func WithLogPretty(pretty bool) Option { o.Log.Pretty = pretty } } + +// WithConfig sets Controller config. +func WithConfig(cfg *config.Config) Option { + return func(o *Options) { + o.Config = cfg + } +} diff --git a/ocis/pkg/runtime/service/service.go b/ocis/pkg/runtime/service/service.go index 731e1f0836..20da924bae 100644 --- a/ocis/pkg/runtime/service/service.go +++ b/ocis/pkg/runtime/service/service.go @@ -1,6 +1,7 @@ package service import ( + "context" "fmt" "net" "net/http" @@ -11,10 +12,13 @@ import ( "sync" "syscall" + "github.com/thejerf/suture" + + settings "github.com/owncloud/ocis/settings/pkg/command" + + ociscfg "github.com/owncloud/ocis/ocis/pkg/config" "github.com/owncloud/ocis/ocis/pkg/runtime/config" - "github.com/owncloud/ocis/ocis/pkg/runtime/controller" "github.com/owncloud/ocis/ocis/pkg/runtime/log" - "github.com/owncloud/ocis/ocis/pkg/runtime/process" "github.com/rs/zerolog" "github.com/spf13/viper" ) @@ -26,10 +30,21 @@ var ( // Service represents a RPC service. type Service struct { - Controller controller.Controller - Log zerolog.Logger - wg *sync.WaitGroup - done bool + Supervisor *suture.Supervisor + ServicesRegistry map[string]func(context.Context, interface{}) suture.Service + serviceToken map[string][]suture.ServiceToken + context context.Context + cancel context.CancelFunc + Log zerolog.Logger + wg *sync.WaitGroup + done bool + cfg *ociscfg.Config +} + +// TODO(refs) think of a less confusing naming +type RuntimeSutureService struct { + Context context.Context + CancelFunc context.CancelFunc } // loadFromEnv would set cmd global variables. This is a workaround spf13/viper since pman used as a library does not @@ -37,16 +52,9 @@ type Service struct { func loadFromEnv() (*config.Config, error) { cfg := config.NewConfig() viper.AutomaticEnv() - - if err := viper.BindEnv("keep-alive", "RUNTIME_KEEP_ALIVE"); err != nil { - return nil, err - } if err := viper.BindEnv("port", "RUNTIME_PORT"); err != nil { return nil, err } - - cfg.KeepAlive = viper.GetBool("keep-alive") - if viper.GetString("port") != "" { cfg.Port = viper.GetString("port") } @@ -66,39 +74,50 @@ func NewService(options ...Option) (*Service, error) { f(opts) } - cfg, err := loadFromEnv() - if err != nil { - return nil, err - } l := log.NewLogger( log.WithPretty(opts.Log.Pretty), ) - return &Service{ - wg: &sync.WaitGroup{}, - Log: l, - Controller: controller.NewController( - controller.WithConfig(cfg), - controller.WithLog(&l), - ), - }, nil + globalCtx, cancelGlobal := context.WithCancel(context.Background()) + + s := &Service{ + ServicesRegistry: make(map[string]func(context.Context, interface{}) suture.Service), + Log: l, + + serviceToken: make(map[string][]suture.ServiceToken), + context: globalCtx, + cancel: cancelGlobal, + wg: &sync.WaitGroup{}, + cfg: opts.Config, + } + + s.ServicesRegistry["settings"] = settings.NewSutureService + + return s, nil } // Start an rpc service. func Start(o ...Option) error { s, err := NewService(o...) if err != nil { - s.Log.Fatal().Err(err) + if s != nil { + s.Log.Fatal().Err(err) + } } + s.Supervisor = suture.NewSimple("ocis") + if err := rpc.Register(s); err != nil { - s.Log.Fatal().Err(err) + if s != nil { + s.Log.Fatal().Err(err) + } } rpc.HandleHTTP() signal.Notify(halt, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGHUP) - l, err := net.Listen("tcp", fmt.Sprintf("%v:%v", s.Controller.Config.Hostname, s.Controller.Config.Port)) + // TODO(refs) change default port + l, err := net.Listen("tcp", fmt.Sprintf("%v:%v", "localhost", "6060")) if err != nil { s.Log.Fatal().Err(err) } @@ -107,7 +126,8 @@ func Start(o ...Option) error { defer func() { if r := recover(); r != nil { reason := strings.Builder{} - if _, err := net.Dial("localhost", s.Controller.Config.Port); err != nil { + // TODO(refs) change default port + if _, err := net.Dial("localhost", "6060"); err != nil { reason.WriteString("runtime address already in use") } @@ -115,45 +135,42 @@ func Start(o ...Option) error { } }() + for k, _ := range s.ServicesRegistry { + s.serviceToken[k] = append(s.serviceToken[k], s.Supervisor.Add(s.ServicesRegistry[k](s.context, s.cfg.Settings))) + } + + go s.Supervisor.ServeBackground() go trap(s) return http.Serve(l, nil) } // Start indicates the Service Controller to start a new supervised service as an OS thread. -func (s *Service) Start(args process.ProcEntry, reply *int) error { - if !s.done { - s.wg.Add(1) - s.Log.Info().Str("service", args.Extension).Msgf("%v", "started") - if err := s.Controller.Start(args); err != nil { - *reply = 1 - return err - } - - *reply = 0 - s.wg.Done() +func (s *Service) Start(name string, reply *int) error { + if _, ok := s.ServicesRegistry[name]; !ok { + *reply = 1 + return nil } - + //s.serviceToken[name] = append(s.serviceToken[name], s.Supervisor.Add(s.ServicesRegistry[name])) + s.serviceToken["settings"] = append(s.serviceToken[name], s.Supervisor.Add(settings.NewSutureService(s.context, s.cfg.Settings))) + *reply = 0 return nil } // List running processes for the Service Controller. func (s *Service) List(args struct{}, reply *string) error { - *reply = s.Controller.List() return nil } // Kill a supervised process by subcommand name. -func (s *Service) Kill(args *string, reply *int) error { - pe := process.ProcEntry{ - Extension: *args, +func (s *Service) Kill(name string, reply *int) error { + if len(s.serviceToken[name]) > 0 { + for _, v := range s.serviceToken[name] { + if err := s.Supervisor.Remove(v); err != nil { + return err + } + } } - if err := s.Controller.Kill(pe); err != nil { - *reply = 1 - return err - } - - *reply = 0 return nil } @@ -163,12 +180,8 @@ func trap(s *Service) { <-halt s.done = true s.wg.Wait() - s.Log.Debug(). - Str("service", "runtime service"). - Msgf("terminating with signal: %v", s) - if err := s.Controller.Shutdown(done); err != nil { - s.Log.Err(err) - } + s.cancel() + s.Log.Debug().Str("service", "runtime service").Msgf("terminating with signal: %v", s) close(done) os.Exit(0) } diff --git a/settings/go.mod b/settings/go.mod index cc5ea762e4..90c262deb5 100644 --- a/settings/go.mod +++ b/settings/go.mod @@ -22,6 +22,7 @@ require ( github.com/prometheus/client_golang v1.7.1 github.com/spf13/viper v1.7.0 github.com/stretchr/testify v1.7.0 + github.com/thejerf/suture v4.0.0+incompatible go.opencensus.io v0.22.6 golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad // indirect golang.org/x/mod v0.4.1 // indirect diff --git a/settings/go.sum b/settings/go.sum index 267bfcd17e..16eaef2198 100644 --- a/settings/go.sum +++ b/settings/go.sum @@ -1291,6 +1291,8 @@ github.com/syndtr/gocapability v0.0.0-20170704070218-db04d3cc01c8/go.mod h1:hkRG github.com/syndtr/goleveldb v1.0.0/go.mod h1:ZVVdQEZoIme9iO1Ch2Jdy24qqXrMMOU6lpPAyBWyWuQ= github.com/tarm/serial v0.0.0-20180830185346-98f6abe2eb07/go.mod h1:kDXzergiv9cbyO7IOYJZWg1U88JhDg3PB6klq9Hg2pA= github.com/technoweenie/multipartstreamer v1.0.1/go.mod h1:jNVxdtShOxzAsukZwTSw6MDx5eUJoiEBsSvzDU9uzog= +github.com/thejerf/suture v4.0.0+incompatible h1:luAwgEo87y1X30wEYa64N4SKMrsAm9qXRwNxnLVuuwg= +github.com/thejerf/suture v4.0.0+incompatible/go.mod h1:ibKwrVj+Uzf3XZdAiNWUouPaAbSoemxOHLmJmwheEMc= github.com/tidwall/gjson v1.3.2/go.mod h1:P256ACg0Mn+j1RXIDXoss50DeIABTYK1PULOJHhxOls= github.com/tidwall/match v1.0.1/go.mod h1:LujAq0jyVjBy028G1WhWfIzbpQfMO8bBZ6Tyb0+pL9E= github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= diff --git a/settings/pkg/command/root.go b/settings/pkg/command/root.go index fcb077626f..121e0b2ae6 100644 --- a/settings/pkg/command/root.go +++ b/settings/pkg/command/root.go @@ -5,6 +5,8 @@ import ( "os" "strings" + "github.com/thejerf/suture" + "github.com/micro/cli/v2" "github.com/owncloud/ocis/ocis-pkg/log" "github.com/owncloud/ocis/settings/pkg/config" @@ -111,18 +113,18 @@ func ParseConfig(c *cli.Context, cfg *config.Config) error { // SutureService allows for the settings command to be embedded and supervised by a suture supervisor tree. type SutureService struct { ctx context.Context - cancel context.CancelFunc // used to cancel the context go-micro services used to shutdown a service. + cancel context.CancelFunc cfg *config.Config } // NewSutureService creates a new settings.SutureService -func NewSutureService(ctx context.Context, cfg *config.Config) SutureService { +func NewSutureService(ctx context.Context, cfg interface{}) suture.Service { sctx, cancel := context.WithCancel(ctx) - cfg.Context = sctx // propagate the context down to the go-micro services. + cfg.(*config.Config).Context = sctx return SutureService{ ctx: sctx, cancel: cancel, - cfg: cfg, + cfg: cfg.(*config.Config), } }