mirror of
https://github.com/opencloud-eu/opencloud.git
synced 2026-01-05 19:59:37 -06:00
remove dead runtime code
This commit is contained in:
@@ -1,168 +0,0 @@
|
||||
package controller
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"os/exec"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/owncloud/ocis/ocis/pkg/runtime/config"
|
||||
"github.com/owncloud/ocis/ocis/pkg/runtime/process"
|
||||
"github.com/owncloud/ocis/ocis/pkg/runtime/storage"
|
||||
"github.com/owncloud/ocis/ocis/pkg/runtime/watcher"
|
||||
"github.com/rs/zerolog"
|
||||
|
||||
"github.com/olekukonko/tablewriter"
|
||||
)
|
||||
|
||||
// Controller supervises processes.
|
||||
type Controller struct {
|
||||
m *sync.RWMutex
|
||||
options Options
|
||||
log zerolog.Logger
|
||||
Config *config.Config
|
||||
|
||||
Store storage.Storage
|
||||
|
||||
// Bin is the oCIS single binary name.
|
||||
Bin string
|
||||
|
||||
// BinPath is the oCIS single binary path withing the host machine.
|
||||
// The Controller needs to know the binary location in order to spawn new extensions.
|
||||
BinPath string
|
||||
|
||||
// Terminated facilitates communication from Watcher <-> Controller. Writes to this
|
||||
// channel WILL always attempt to restart the crashed process.
|
||||
Terminated chan process.ProcEntry
|
||||
}
|
||||
|
||||
var (
|
||||
once = sync.Once{}
|
||||
)
|
||||
|
||||
// NewController initializes a new controller.
|
||||
func NewController(o ...Option) Controller {
|
||||
opts := &Options{}
|
||||
|
||||
for _, f := range o {
|
||||
f(opts)
|
||||
}
|
||||
|
||||
c := Controller{
|
||||
m: &sync.RWMutex{},
|
||||
options: *opts,
|
||||
log: *opts.Log,
|
||||
Bin: "ocis",
|
||||
Terminated: make(chan process.ProcEntry),
|
||||
Store: storage.NewMapStorage(),
|
||||
|
||||
Config: opts.Config,
|
||||
}
|
||||
|
||||
if opts.Bin != "" {
|
||||
c.Bin = opts.Bin
|
||||
}
|
||||
|
||||
// Get binary location from $PATH lookup. If not present, it uses arg[0] as entry point.
|
||||
path, err := exec.LookPath(c.Bin)
|
||||
if err != nil {
|
||||
c.log.Debug().Msg("oCIS binary not present in PATH, using Args[0]")
|
||||
path = os.Args[0]
|
||||
}
|
||||
c.BinPath = path
|
||||
return c
|
||||
}
|
||||
|
||||
// Start and watches a process.
|
||||
func (c *Controller) Start(pe process.ProcEntry) error {
|
||||
if pid := c.Store.Load(pe.Extension); pid != 0 {
|
||||
c.log.Debug().Msg(fmt.Sprintf("extension already running: %s", pe.Extension))
|
||||
return nil
|
||||
}
|
||||
|
||||
w := watcher.NewWatcher()
|
||||
if err := pe.Start(c.BinPath); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// store the spawned child process PID.
|
||||
if err := c.Store.Store(pe); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
w.Follow(pe, c.Terminated, c.options.Config.KeepAlive)
|
||||
|
||||
once.Do(func() {
|
||||
j := janitor{
|
||||
time.Second,
|
||||
c.Store,
|
||||
}
|
||||
|
||||
go j.run()
|
||||
go detach(c)
|
||||
})
|
||||
return nil
|
||||
}
|
||||
|
||||
// Kill a managed process.
|
||||
// Should a process managed by the runtime be allowed to be killed if the runtime is configured not to?
|
||||
func (c *Controller) Kill(pe process.ProcEntry) error {
|
||||
// load stored PID
|
||||
pid := c.Store.Load(pe.Extension)
|
||||
|
||||
// find process in host by PID
|
||||
p, err := os.FindProcess(pid)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := c.Store.Delete(pe); err != nil {
|
||||
return err
|
||||
}
|
||||
c.log.Info().Str("package", "watcher").Msgf("terminating %v", pe.Extension)
|
||||
|
||||
// terminate child process
|
||||
return p.Kill()
|
||||
}
|
||||
|
||||
// Shutdown a running runtime.
|
||||
func (c *Controller) Shutdown(ch chan struct{}) error {
|
||||
entries := c.Store.LoadAll()
|
||||
for cmd, pid := range entries {
|
||||
c.log.Info().Str("package", "watcher").Msgf("gracefully terminating %v", cmd)
|
||||
p, _ := os.FindProcess(pid)
|
||||
if err := p.Kill(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
ch <- struct{}{}
|
||||
return nil
|
||||
}
|
||||
|
||||
// List managed processes.
|
||||
func (c *Controller) List() string {
|
||||
tableString := &strings.Builder{}
|
||||
table := tablewriter.NewWriter(tableString)
|
||||
table.SetHeader([]string{"Extension", "PID"})
|
||||
|
||||
entries := c.Store.LoadAll()
|
||||
|
||||
keys := make([]string, 0, len(entries))
|
||||
for k := range entries {
|
||||
keys = append(keys, k)
|
||||
}
|
||||
|
||||
sort.Strings(keys)
|
||||
|
||||
for _, v := range keys {
|
||||
table.Append([]string{v, strconv.Itoa(entries[v])})
|
||||
}
|
||||
|
||||
table.Render()
|
||||
return tableString.String()
|
||||
}
|
||||
@@ -1,49 +0,0 @@
|
||||
package controller
|
||||
|
||||
import (
|
||||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/owncloud/ocis/ocis/pkg/runtime/process"
|
||||
"github.com/owncloud/ocis/ocis/pkg/runtime/storage"
|
||||
)
|
||||
|
||||
type janitor struct {
|
||||
// interval at which db is cleared.
|
||||
interval time.Duration
|
||||
|
||||
store storage.Storage
|
||||
}
|
||||
|
||||
func (j *janitor) run() {
|
||||
ticker := time.NewTicker(j.interval)
|
||||
work := make(chan os.Signal, 1)
|
||||
signal.Notify(work, syscall.SIGHUP, syscall.SIGINT, syscall.SIGQUIT)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-work:
|
||||
return
|
||||
case <-ticker.C:
|
||||
j.cleanup()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// cleanup removes orphaned extension + pid that were killed via SIGKILL given the nature of is being un-catchable,
|
||||
// the only way to update pman's database is by polling.
|
||||
func (j *janitor) cleanup() {
|
||||
for name, pid := range j.store.LoadAll() {
|
||||
// On unix like systems (linux, freebsd, etc) os.FindProcess will never return an error
|
||||
if p, err := os.FindProcess(pid); err == nil {
|
||||
if err := p.Signal(syscall.Signal(0)); err != nil {
|
||||
_ = j.store.Delete(process.ProcEntry{
|
||||
Pid: pid,
|
||||
Extension: name,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,36 +0,0 @@
|
||||
package controller
|
||||
|
||||
import (
|
||||
"github.com/owncloud/ocis/ocis/pkg/runtime/config"
|
||||
"github.com/rs/zerolog"
|
||||
)
|
||||
|
||||
// Options are the configurable options for a Controller.
|
||||
type Options struct {
|
||||
Bin string
|
||||
Restart bool
|
||||
Config *config.Config
|
||||
Log *zerolog.Logger
|
||||
}
|
||||
|
||||
// Option represents an option.
|
||||
type Option func(o *Options)
|
||||
|
||||
// NewOptions returns a new Options struct.
|
||||
func NewOptions() Options {
|
||||
return Options{}
|
||||
}
|
||||
|
||||
// WithConfig sets Controller config.
|
||||
func WithConfig(cfg *config.Config) Option {
|
||||
return func(o *Options) {
|
||||
o.Config = cfg
|
||||
}
|
||||
}
|
||||
|
||||
// WithLog sets Controller config.
|
||||
func WithLog(l *zerolog.Logger) Option {
|
||||
return func(o *Options) {
|
||||
o.Log = l
|
||||
}
|
||||
}
|
||||
@@ -1,10 +0,0 @@
|
||||
package controller
|
||||
|
||||
// detach will try to restart processes on failures.
|
||||
func detach(c *Controller) {
|
||||
for proc := range c.Terminated {
|
||||
if err := c.Start(proc); err != nil {
|
||||
c.log.Err(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,30 +0,0 @@
|
||||
package log
|
||||
|
||||
import (
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/rs/zerolog"
|
||||
)
|
||||
|
||||
var (
|
||||
// Level sets a project wide log level
|
||||
Level zerolog.Level = zerolog.InfoLevel
|
||||
)
|
||||
|
||||
// NewLogger configures a logger.
|
||||
func NewLogger(options ...Option) zerolog.Logger {
|
||||
zerolog.SetGlobalLevel(Level)
|
||||
|
||||
o := NewOptions()
|
||||
for _, f := range options {
|
||||
f(o)
|
||||
}
|
||||
|
||||
logger := zerolog.New(os.Stdout).With().Timestamp().Logger()
|
||||
if o.Pretty {
|
||||
logger = logger.Output(zerolog.ConsoleWriter{Out: os.Stdout, TimeFormat: time.RFC3339})
|
||||
}
|
||||
|
||||
return logger
|
||||
}
|
||||
@@ -1,26 +0,0 @@
|
||||
package log
|
||||
|
||||
import "github.com/rs/zerolog"
|
||||
|
||||
// Options are the configurable options for a Controller.
|
||||
type Options struct {
|
||||
Level zerolog.Level
|
||||
Pretty bool
|
||||
}
|
||||
|
||||
// Option represents an option.
|
||||
type Option func(o *Options)
|
||||
|
||||
// NewOptions returns a new Options struct.
|
||||
func NewOptions() *Options {
|
||||
return &Options{
|
||||
Level: zerolog.DebugLevel,
|
||||
}
|
||||
}
|
||||
|
||||
// WithPretty sets the pretty option.
|
||||
func WithPretty(pretty bool) Option {
|
||||
return func(o *Options) {
|
||||
o.Pretty = pretty
|
||||
}
|
||||
}
|
||||
@@ -1,60 +0,0 @@
|
||||
// +build !windows
|
||||
|
||||
package process
|
||||
|
||||
import (
|
||||
"os"
|
||||
|
||||
sys "golang.org/x/sys/unix"
|
||||
)
|
||||
|
||||
// ProcEntry is an entry in the File db.
|
||||
type ProcEntry struct {
|
||||
Args []string
|
||||
Env []string
|
||||
Pid int
|
||||
Extension string
|
||||
}
|
||||
|
||||
// NewProcEntry returns a new ProcEntry.
|
||||
func NewProcEntry(extension string, env []string, args ...string) ProcEntry {
|
||||
return ProcEntry{
|
||||
Extension: extension,
|
||||
Args: args,
|
||||
Env: env,
|
||||
}
|
||||
}
|
||||
|
||||
// Start a process.
|
||||
func (e *ProcEntry) Start(binPath string) error {
|
||||
var argv = []string{binPath}
|
||||
argv = append(argv, e.Args...)
|
||||
|
||||
p, err := os.StartProcess(binPath, argv, &os.ProcAttr{
|
||||
Files: []*os.File{
|
||||
os.Stdin,
|
||||
os.Stdout,
|
||||
os.Stderr,
|
||||
},
|
||||
Env: e.Env,
|
||||
Sys: &sys.SysProcAttr{
|
||||
Setpgid: true,
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
e.Pid = p.Pid
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Kill the wrapped process.
|
||||
func (e *ProcEntry) Kill() error {
|
||||
p, err := os.FindProcess(e.Pid)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return p.Kill()
|
||||
}
|
||||
@@ -1,56 +0,0 @@
|
||||
// +build windows
|
||||
|
||||
package process
|
||||
|
||||
import (
|
||||
"os"
|
||||
)
|
||||
|
||||
// ProcEntry is an entry in the File db.
|
||||
type ProcEntry struct {
|
||||
Args []string
|
||||
Env []string
|
||||
Pid int
|
||||
Extension string
|
||||
}
|
||||
|
||||
// NewProcEntry returns a new ProcEntry.
|
||||
func NewProcEntry(extension string, env []string, args ...string) ProcEntry {
|
||||
return ProcEntry{
|
||||
Extension: extension,
|
||||
Args: args,
|
||||
Env: env,
|
||||
}
|
||||
}
|
||||
|
||||
// Start a process.
|
||||
func (e *ProcEntry) Start(binPath string) error {
|
||||
var argv = []string{binPath}
|
||||
argv = append(argv, e.Args...)
|
||||
|
||||
p, err := os.StartProcess(binPath, argv, &os.ProcAttr{
|
||||
Files: []*os.File{
|
||||
os.Stdin,
|
||||
os.Stdout,
|
||||
os.Stderr,
|
||||
},
|
||||
Env: e.Env,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
e.Pid = p.Pid
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Kill the wrapped process.
|
||||
func (e *ProcEntry) Kill() error {
|
||||
p, err := os.FindProcess(e.Pid)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return p.Kill()
|
||||
}
|
||||
@@ -1,64 +0,0 @@
|
||||
package storage
|
||||
|
||||
import (
|
||||
"github.com/owncloud/ocis/ocis/pkg/runtime/process"
|
||||
|
||||
"sync"
|
||||
)
|
||||
|
||||
// Map synchronizes access to extension+pid tuples.
|
||||
type Map struct {
|
||||
c *sync.Map
|
||||
}
|
||||
|
||||
// NewMapStorage initializes a new Storage.
|
||||
func NewMapStorage() Storage {
|
||||
return &Map{
|
||||
c: &sync.Map{},
|
||||
}
|
||||
}
|
||||
|
||||
// Store a value on the underlying data structure.
|
||||
func (m *Map) Store(e process.ProcEntry) error {
|
||||
m.c.Store(e.Extension, e.Pid)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Delete a value on the underlying data structure.
|
||||
func (m *Map) Delete(e process.ProcEntry) error {
|
||||
m.c.Delete(e.Extension)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Load a single pid.
|
||||
func (m *Map) Load(name string) int {
|
||||
var val int
|
||||
m.c.Range(func(k, v interface{}) bool {
|
||||
if k.(string) == name {
|
||||
val = v.(int)
|
||||
return false
|
||||
}
|
||||
return true
|
||||
})
|
||||
return val
|
||||
}
|
||||
|
||||
// LoadAll values from the underlying data structure.
|
||||
func (m *Map) LoadAll() Entries {
|
||||
e := make(map[string]int)
|
||||
m.c.Range(func(k, v interface{}) bool {
|
||||
ks, ok := k.(string)
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
|
||||
vs, ok := v.(int)
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
|
||||
e[ks] = vs
|
||||
return true
|
||||
})
|
||||
return e
|
||||
}
|
||||
@@ -1,50 +0,0 @@
|
||||
package storage
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"os"
|
||||
"strconv"
|
||||
"testing"
|
||||
|
||||
"github.com/owncloud/ocis/ocis/pkg/runtime/process"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestMain(m *testing.M) {
|
||||
if err := loadStore(); err != nil {
|
||||
os.Exit(1)
|
||||
}
|
||||
os.Exit(m.Run())
|
||||
}
|
||||
|
||||
var (
|
||||
store = NewMapStorage()
|
||||
)
|
||||
|
||||
func loadStore() error {
|
||||
for i := 0; i < 20; i++ {
|
||||
if err := store.Store(process.ProcEntry{
|
||||
Pid: rand.Int(), //nolint:gosec
|
||||
Extension: fmt.Sprintf("extension-%s", strconv.Itoa(i)),
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestLoadAll(t *testing.T) {
|
||||
all := store.LoadAll()
|
||||
assert.NotNil(t, all["extension-1"])
|
||||
}
|
||||
|
||||
func TestDelete(t *testing.T) {
|
||||
err := store.Delete(process.ProcEntry{
|
||||
Extension: "extension-1",
|
||||
})
|
||||
assert.Nil(t, err)
|
||||
all := store.LoadAll()
|
||||
assert.Zero(t, all["extension-1"])
|
||||
}
|
||||
@@ -1,21 +0,0 @@
|
||||
package storage
|
||||
|
||||
import "github.com/owncloud/ocis/ocis/pkg/runtime/process"
|
||||
|
||||
// Entries is a tuple of <extension:pid>
|
||||
type Entries map[string]int
|
||||
|
||||
// Storage defines a basic persistence interface layer.
|
||||
type Storage interface {
|
||||
// Store a representation of a process.
|
||||
Store(e process.ProcEntry) error
|
||||
|
||||
// Delete a representation of a process.
|
||||
Delete(e process.ProcEntry) error
|
||||
|
||||
// Load a single entry.
|
||||
Load(name string) int
|
||||
|
||||
// LoadAll retrieves a set of entries of running processes on the host machine.
|
||||
LoadAll() Entries
|
||||
}
|
||||
@@ -1,55 +0,0 @@
|
||||
package watcher
|
||||
|
||||
import (
|
||||
golog "log"
|
||||
"os"
|
||||
|
||||
"github.com/owncloud/ocis/ocis/pkg/runtime/log"
|
||||
"github.com/owncloud/ocis/ocis/pkg/runtime/process"
|
||||
"github.com/rs/zerolog"
|
||||
)
|
||||
|
||||
// Watcher watches a process and sends messages using channels.
|
||||
type Watcher struct {
|
||||
log zerolog.Logger
|
||||
}
|
||||
|
||||
// NewWatcher initializes a watcher.
|
||||
func NewWatcher() Watcher {
|
||||
return Watcher{
|
||||
log: log.NewLogger(log.WithPretty(true)),
|
||||
}
|
||||
}
|
||||
|
||||
// Follow a process until it dies. If restart is enabled, a new fork of the original process will be automatically spawned.
|
||||
func (w *Watcher) Follow(pe process.ProcEntry, followerChan chan process.ProcEntry, restart bool) {
|
||||
state := make(chan *os.ProcessState, 1)
|
||||
|
||||
w.log.Debug().Str("package", "watcher").Msgf("watching %v", pe.Extension)
|
||||
go func() {
|
||||
ps, err := watch(pe.Pid)
|
||||
if err != nil {
|
||||
golog.Fatal(err)
|
||||
}
|
||||
|
||||
state <- ps
|
||||
}()
|
||||
|
||||
go func() {
|
||||
status := <-state
|
||||
w.log.Info().Str("package", "watcher").Msgf("%v exited with: %v", pe.Extension, status)
|
||||
if restart {
|
||||
followerChan <- pe
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// watch a process by its pid. This operation blocks.
|
||||
func watch(pid int) (*os.ProcessState, error) {
|
||||
p, err := os.FindProcess(pid)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return p.Wait()
|
||||
}
|
||||
Reference in New Issue
Block a user