This commit is contained in:
A.Unger
2021-03-09 13:10:05 +01:00
parent 9284c232b8
commit 757cb9abe9
9 changed files with 162 additions and 127 deletions
+1 -1
View File
@@ -27,7 +27,7 @@ func KillCommand(cfg *config.Config) *cli.Command {
},
&cli.StringFlag{
Name: "port",
Value: "10666",
Value: "6060",
EnvVars: []string{"OCIS_RUNTIME_PORT"},
Destination: &cfg.Runtime.Port,
},
+18 -8
View File
@@ -1,6 +1,11 @@
package command
import (
"fmt"
"log"
"net"
"net/rpc"
cli "github.com/micro/cli/v2"
"github.com/owncloud/ocis/ocis/pkg/config"
@@ -22,20 +27,25 @@ func RunCommand(cfg *config.Config) *cli.Command {
},
&cli.StringFlag{
Name: "port",
Value: "10666",
Value: "6060",
EnvVars: []string{"OCIS_RUNTIME_PORT"},
Destination: &cfg.Runtime.Port,
},
},
Action: func(c *cli.Context) error {
// TODO(refs) this implementation changes as we don't depend on os threads anymore.
//client, err := rpc.DialHTTP("tcp", net.JoinHostPort(cfg.Runtime.Hostname, cfg.Runtime.Port))
//if err != nil {
// log.Fatal("dialing:", err)
//}
//
//res := runtime.RunService(client, os.Args[2])
//fmt.Println(res)
client, err := rpc.DialHTTP("tcp", net.JoinHostPort(cfg.Runtime.Hostname, cfg.Runtime.Port))
if err != nil {
log.Fatal("dialing:", err)
}
var reply int
if err := client.Call("Service.Start", "settings", &reply); err != nil {
log.Fatal(err)
}
fmt.Println(reply)
return nil
},
}
+50 -56
View File
@@ -1,25 +1,12 @@
package runtime
import (
"context"
"os"
"os/signal"
mzlog "github.com/asim/go-micro/plugins/logger/zerolog/v3"
"github.com/asim/go-micro/v3/logger"
accounts "github.com/owncloud/ocis/accounts/pkg/command"
glauth "github.com/owncloud/ocis/glauth/pkg/command"
idp "github.com/owncloud/ocis/idp/pkg/command"
"github.com/owncloud/ocis/ocis/pkg/config"
ocs "github.com/owncloud/ocis/ocs/pkg/command"
onlyoffice "github.com/owncloud/ocis/onlyoffice/pkg/command"
proxy "github.com/owncloud/ocis/proxy/pkg/command"
settings "github.com/owncloud/ocis/settings/pkg/command"
storage "github.com/owncloud/ocis/storage/pkg/command"
store "github.com/owncloud/ocis/store/pkg/command"
thumbnails "github.com/owncloud/ocis/thumbnails/pkg/command"
web "github.com/owncloud/ocis/web/pkg/command"
webdav "github.com/owncloud/ocis/webdav/pkg/command"
"github.com/owncloud/ocis/ocis/pkg/runtime/service"
"github.com/rs/zerolog"
"github.com/thejerf/suture"
)
@@ -45,51 +32,58 @@ type serviceTokens map[string][]suture.ServiceToken
var tokens = serviceTokens{}
// Start rpc runtime
// TODO(refs) initialize a new Service struct
// - run the set of default services under the server command.
func (r *Runtime) Start() error {
setMicroLogger(r.c.Log)
halt := make(chan os.Signal, 1)
signal.Notify(halt, os.Interrupt)
// 1. create new Service struct
// 2. start
supervisor := suture.NewSimple("ocis")
globalCtx, globalCancel := context.WithCancel(context.Background())
return service.Start(service.WithConfig(r.c))
// propagate reva log config to storage services
inheritedOptions := []storage.Option{
storage.WithLogPretty(r.c.Log.Pretty),
storage.WithLogColor(r.c.Log.Color),
storage.WithLogLevel(r.c.Log.Level),
}
addServiceToken("settings", supervisor.Add(settings.NewSutureService(globalCtx, r.c.Settings)))
addServiceToken("storage-metadata", supervisor.Add(storage.NewStorageMetadata(globalCtx, inheritedOptions...)))
addServiceToken("accounts", supervisor.Add(accounts.NewSutureService(globalCtx, r.c.Accounts)))
addServiceToken("glauth", supervisor.Add(glauth.NewSutureService(globalCtx, r.c.GLAuth)))
addServiceToken("idp", supervisor.Add(idp.NewSutureService(globalCtx, r.c.IDP)))
addServiceToken("ocs", supervisor.Add(ocs.NewSutureService(globalCtx, r.c.OCS)))
addServiceToken("onlyoffice", supervisor.Add(onlyoffice.NewSutureService(globalCtx, r.c.Onlyoffice)))
addServiceToken("proxy", supervisor.Add(proxy.NewSutureService(globalCtx, r.c.Proxy)))
addServiceToken("store", supervisor.Add(store.NewSutureService(globalCtx, r.c.Store)))
addServiceToken("thumbnails", supervisor.Add(thumbnails.NewSutureService(globalCtx, r.c.Thumbnails)))
addServiceToken("web", supervisor.Add(web.NewSutureService(globalCtx, r.c.Web)))
addServiceToken("webdav", supervisor.Add(webdav.NewSutureService(globalCtx, r.c.WebDAV)))
addServiceToken("storage-frontend", supervisor.Add(storage.NewFrontend(globalCtx, inheritedOptions...)))
addServiceToken("storage-gateway", supervisor.Add(storage.NewGateway(globalCtx, inheritedOptions...)))
addServiceToken("storage-users", supervisor.Add(storage.NewUsersProviderService(globalCtx, inheritedOptions...)))
addServiceToken("storage-groupsprovider", supervisor.Add(storage.NewGroupsProvider(globalCtx, inheritedOptions...))) // TODO(refs) panic? are we sending to a nil / closed channel?
addServiceToken("storage-authbasic", supervisor.Add(storage.NewAuthBasic(globalCtx, inheritedOptions...)))
addServiceToken("storage-authbearer", supervisor.Add(storage.NewAuthBearer(globalCtx, inheritedOptions...)))
addServiceToken("storage-home", supervisor.Add(storage.NewStorageHome(globalCtx, inheritedOptions...)))
addServiceToken("storage-users", supervisor.Add(storage.NewStorageUsers(globalCtx, inheritedOptions...)))
addServiceToken("storage-public-link", supervisor.Add(storage.NewStoragePublicLink(globalCtx, inheritedOptions...)))
addServiceToken("storage-sharing", supervisor.Add(storage.NewSharing(globalCtx, inheritedOptions...)))
// TODO(refs) debug line with supervised services.
go supervisor.ServeBackground()
<-halt
globalCancel()
close(halt)
//setMicroLogger(r.c.Log)
//halt := make(chan os.Signal, 1)
//signal.Notify(halt, os.Interrupt)
//
//supervisor := suture.NewSimple("ocis")
//globalCtx, globalCancel := context.WithCancel(context.Background())
//
//// propagate reva log config to storage services
//inheritedOptions := []storage.Option{
// storage.WithLogPretty(r.c.Log.Pretty),
// storage.WithLogColor(r.c.Log.Color),
// storage.WithLogLevel(r.c.Log.Level),
//}
//
//addServiceToken("settings", supervisor.Add(settings.NewSutureService(globalCtx, r.c.Settings)))
//addServiceToken("storage-metadata", supervisor.Add(storage.NewStorageMetadata(globalCtx, inheritedOptions...)))
//addServiceToken("accounts", supervisor.Add(accounts.NewSutureService(globalCtx, r.c.Accounts)))
//addServiceToken("glauth", supervisor.Add(glauth.NewSutureService(globalCtx, r.c.GLAuth)))
//addServiceToken("idp", supervisor.Add(idp.NewSutureService(globalCtx, r.c.IDP)))
//addServiceToken("ocs", supervisor.Add(ocs.NewSutureService(globalCtx, r.c.OCS)))
//addServiceToken("onlyoffice", supervisor.Add(onlyoffice.NewSutureService(globalCtx, r.c.Onlyoffice)))
//addServiceToken("proxy", supervisor.Add(proxy.NewSutureService(globalCtx, r.c.Proxy)))
//addServiceToken("store", supervisor.Add(store.NewSutureService(globalCtx, r.c.Store)))
//addServiceToken("thumbnails", supervisor.Add(thumbnails.NewSutureService(globalCtx, r.c.Thumbnails)))
//addServiceToken("web", supervisor.Add(web.NewSutureService(globalCtx, r.c.Web)))
//addServiceToken("webdav", supervisor.Add(webdav.NewSutureService(globalCtx, r.c.WebDAV)))
//addServiceToken("storage-frontend", supervisor.Add(storage.NewFrontend(globalCtx, inheritedOptions...)))
//addServiceToken("storage-gateway", supervisor.Add(storage.NewGateway(globalCtx, inheritedOptions...)))
//addServiceToken("storage-users", supervisor.Add(storage.NewUsersProviderService(globalCtx, inheritedOptions...)))
//addServiceToken("storage-groupsprovider", supervisor.Add(storage.NewGroupsProvider(globalCtx, inheritedOptions...)))
//addServiceToken("storage-authbasic", supervisor.Add(storage.NewAuthBasic(globalCtx, inheritedOptions...)))
//addServiceToken("storage-authbearer", supervisor.Add(storage.NewAuthBearer(globalCtx, inheritedOptions...)))
//addServiceToken("storage-home", supervisor.Add(storage.NewStorageHome(globalCtx, inheritedOptions...)))
//addServiceToken("storage-users", supervisor.Add(storage.NewStorageUsers(globalCtx, inheritedOptions...)))
//addServiceToken("storage-public-link", supervisor.Add(storage.NewStoragePublicLink(globalCtx, inheritedOptions...)))
//addServiceToken("storage-sharing", supervisor.Add(storage.NewSharing(globalCtx, inheritedOptions...)))
//
//// TODO(refs) debug line with supervised services.
//go supervisor.ServeBackground()
//
//<-halt
//
//globalCancel()
//close(halt)
return nil
}
+13 -1
View File
@@ -1,5 +1,9 @@
package service
import (
"github.com/owncloud/ocis/ocis/pkg/config"
)
// Log configures a structure logger.
type Log struct {
Pretty bool
@@ -7,7 +11,8 @@ type Log struct {
// Options are the configurable options for a Service.
type Options struct {
Log *Log
Log *Log
Config *config.Config
}
// Option represents an option.
@@ -26,3 +31,10 @@ func WithLogPretty(pretty bool) Option {
o.Log.Pretty = pretty
}
}
// WithConfig sets Controller config.
func WithConfig(cfg *config.Config) Option {
return func(o *Options) {
o.Config = cfg
}
}
+70 -57
View File
@@ -1,6 +1,7 @@
package service
import (
"context"
"fmt"
"net"
"net/http"
@@ -11,10 +12,13 @@ import (
"sync"
"syscall"
"github.com/thejerf/suture"
settings "github.com/owncloud/ocis/settings/pkg/command"
ociscfg "github.com/owncloud/ocis/ocis/pkg/config"
"github.com/owncloud/ocis/ocis/pkg/runtime/config"
"github.com/owncloud/ocis/ocis/pkg/runtime/controller"
"github.com/owncloud/ocis/ocis/pkg/runtime/log"
"github.com/owncloud/ocis/ocis/pkg/runtime/process"
"github.com/rs/zerolog"
"github.com/spf13/viper"
)
@@ -26,10 +30,21 @@ var (
// Service represents a RPC service.
type Service struct {
Controller controller.Controller
Log zerolog.Logger
wg *sync.WaitGroup
done bool
Supervisor *suture.Supervisor
ServicesRegistry map[string]func(context.Context, interface{}) suture.Service
serviceToken map[string][]suture.ServiceToken
context context.Context
cancel context.CancelFunc
Log zerolog.Logger
wg *sync.WaitGroup
done bool
cfg *ociscfg.Config
}
// TODO(refs) think of a less confusing naming
type RuntimeSutureService struct {
Context context.Context
CancelFunc context.CancelFunc
}
// loadFromEnv would set cmd global variables. This is a workaround spf13/viper since pman used as a library does not
@@ -37,16 +52,9 @@ type Service struct {
func loadFromEnv() (*config.Config, error) {
cfg := config.NewConfig()
viper.AutomaticEnv()
if err := viper.BindEnv("keep-alive", "RUNTIME_KEEP_ALIVE"); err != nil {
return nil, err
}
if err := viper.BindEnv("port", "RUNTIME_PORT"); err != nil {
return nil, err
}
cfg.KeepAlive = viper.GetBool("keep-alive")
if viper.GetString("port") != "" {
cfg.Port = viper.GetString("port")
}
@@ -66,39 +74,50 @@ func NewService(options ...Option) (*Service, error) {
f(opts)
}
cfg, err := loadFromEnv()
if err != nil {
return nil, err
}
l := log.NewLogger(
log.WithPretty(opts.Log.Pretty),
)
return &Service{
wg: &sync.WaitGroup{},
Log: l,
Controller: controller.NewController(
controller.WithConfig(cfg),
controller.WithLog(&l),
),
}, nil
globalCtx, cancelGlobal := context.WithCancel(context.Background())
s := &Service{
ServicesRegistry: make(map[string]func(context.Context, interface{}) suture.Service),
Log: l,
serviceToken: make(map[string][]suture.ServiceToken),
context: globalCtx,
cancel: cancelGlobal,
wg: &sync.WaitGroup{},
cfg: opts.Config,
}
s.ServicesRegistry["settings"] = settings.NewSutureService
return s, nil
}
// Start an rpc service.
func Start(o ...Option) error {
s, err := NewService(o...)
if err != nil {
s.Log.Fatal().Err(err)
if s != nil {
s.Log.Fatal().Err(err)
}
}
s.Supervisor = suture.NewSimple("ocis")
if err := rpc.Register(s); err != nil {
s.Log.Fatal().Err(err)
if s != nil {
s.Log.Fatal().Err(err)
}
}
rpc.HandleHTTP()
signal.Notify(halt, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGHUP)
l, err := net.Listen("tcp", fmt.Sprintf("%v:%v", s.Controller.Config.Hostname, s.Controller.Config.Port))
// TODO(refs) change default port
l, err := net.Listen("tcp", fmt.Sprintf("%v:%v", "localhost", "6060"))
if err != nil {
s.Log.Fatal().Err(err)
}
@@ -107,7 +126,8 @@ func Start(o ...Option) error {
defer func() {
if r := recover(); r != nil {
reason := strings.Builder{}
if _, err := net.Dial("localhost", s.Controller.Config.Port); err != nil {
// TODO(refs) change default port
if _, err := net.Dial("localhost", "6060"); err != nil {
reason.WriteString("runtime address already in use")
}
@@ -115,45 +135,42 @@ func Start(o ...Option) error {
}
}()
for k, _ := range s.ServicesRegistry {
s.serviceToken[k] = append(s.serviceToken[k], s.Supervisor.Add(s.ServicesRegistry[k](s.context, s.cfg.Settings)))
}
go s.Supervisor.ServeBackground()
go trap(s)
return http.Serve(l, nil)
}
// Start indicates the Service Controller to start a new supervised service as an OS thread.
func (s *Service) Start(args process.ProcEntry, reply *int) error {
if !s.done {
s.wg.Add(1)
s.Log.Info().Str("service", args.Extension).Msgf("%v", "started")
if err := s.Controller.Start(args); err != nil {
*reply = 1
return err
}
*reply = 0
s.wg.Done()
func (s *Service) Start(name string, reply *int) error {
if _, ok := s.ServicesRegistry[name]; !ok {
*reply = 1
return nil
}
//s.serviceToken[name] = append(s.serviceToken[name], s.Supervisor.Add(s.ServicesRegistry[name]))
s.serviceToken["settings"] = append(s.serviceToken[name], s.Supervisor.Add(settings.NewSutureService(s.context, s.cfg.Settings)))
*reply = 0
return nil
}
// List running processes for the Service Controller.
func (s *Service) List(args struct{}, reply *string) error {
*reply = s.Controller.List()
return nil
}
// Kill a supervised process by subcommand name.
func (s *Service) Kill(args *string, reply *int) error {
pe := process.ProcEntry{
Extension: *args,
func (s *Service) Kill(name string, reply *int) error {
if len(s.serviceToken[name]) > 0 {
for _, v := range s.serviceToken[name] {
if err := s.Supervisor.Remove(v); err != nil {
return err
}
}
}
if err := s.Controller.Kill(pe); err != nil {
*reply = 1
return err
}
*reply = 0
return nil
}
@@ -163,12 +180,8 @@ func trap(s *Service) {
<-halt
s.done = true
s.wg.Wait()
s.Log.Debug().
Str("service", "runtime service").
Msgf("terminating with signal: %v", s)
if err := s.Controller.Shutdown(done); err != nil {
s.Log.Err(err)
}
s.cancel()
s.Log.Debug().Str("service", "runtime service").Msgf("terminating with signal: %v", s)
close(done)
os.Exit(0)
}