From 775fa9e5a8c3f5fdeacd91ab00d3caac32d9c8f9 Mon Sep 17 00:00:00 2001 From: Alexander Baryshnikov Date: Fri, 22 Jun 2018 20:59:30 +0800 Subject: [PATCH] add REST api --- monexec/config.go | 6 +- plugins/README.md | 2 +- plugins/adp_consul.go | 2 +- plugins/adp_critical.go | 2 +- plugins/adp_email.go | 2 +- plugins/adp_http.go | 2 +- plugins/adp_rest.go | 131 ++++++++++++++++++++++++++++++++++++ plugins/adp_telegram.go | 2 +- plugins/plugin_interface.go | 3 +- pool/executable.go | 5 +- sample/sample6_rest.yaml | 10 +++ 11 files changed, 156 insertions(+), 11 deletions(-) create mode 100644 plugins/adp_rest.go create mode 100644 sample/sample6_rest.yaml diff --git a/monexec/config.go b/monexec/config.go index addf986..a3d369f 100644 --- a/monexec/config.go +++ b/monexec/config.go @@ -73,7 +73,7 @@ func (config *Config) Run(sv *pool.Pool, ctx context.Context) error { // Initialize plugins //-- prepare and add all plugins for pluginName, pluginInstance := range config.loadedPlugins { - err := pluginInstance.Prepare() + err := pluginInstance.Prepare(ctx, sv) if err != nil { log.Println("failed prepare plugin", pluginName, "-", err) } else { @@ -131,8 +131,8 @@ func LoadConfig(locations ...string) (*Config, error) { } var wrap = description - - if reflect.ValueOf(wrap).Type().Kind() == reflect.Slice { + refVal := reflect.ValueOf(wrap) + if wrap != nil && refVal.Type().Kind() == reflect.Slice { wrap = map[string]interface{}{ "": description, } diff --git a/plugins/README.md b/plugins/README.md index faa7586..e38d3b0 100644 --- a/plugins/README.md +++ b/plugins/README.md @@ -11,7 +11,7 @@ import ( type MyPlugin struct {} -func (p *MyPlugin) Prepare() error { return nil } +func (p *MyPlugin) Prepare(ctx context.Context, pl *pool.Pool) error { return nil } func (p *MyPlugin) OnSpawned(ctx context.Context, sv pool.Instance) {} diff --git a/plugins/adp_consul.go b/plugins/adp_consul.go index ce082f0..fb7e435 100644 --- a/plugins/adp_consul.go +++ b/plugins/adp_consul.go @@ -37,7 +37,7 @@ func DefaultConsul() ConsulPlugin { } } -func (p *ConsulPlugin) Prepare() error { +func (p *ConsulPlugin) Prepare(ctx context.Context, pl *pool.Pool) error { consulConfig := api.DefaultConfig() consulConfig.Address = p.URL consul, err := api.NewClient(consulConfig) diff --git a/plugins/adp_critical.go b/plugins/adp_critical.go index 516b901..35fc91d 100644 --- a/plugins/adp_critical.go +++ b/plugins/adp_critical.go @@ -34,7 +34,7 @@ func (a *Critical) MergeFrom(other interface{}) (error) { return nil } -func (a *Critical) Prepare() error { +func (a *Critical) Prepare(ctx context.Context, pl *pool.Pool) error { return nil } func (a *Critical) Close() error { return nil } diff --git a/plugins/adp_email.go b/plugins/adp_email.go index 81b4e57..2b74080 100644 --- a/plugins/adp_email.go +++ b/plugins/adp_email.go @@ -65,7 +65,7 @@ func (c *Email) OnStopped(ctx context.Context, sv pool.Instance, err error) { func (p *Email) OnFinished(ctx context.Context, sv pool.Instance) {} -func (c *Email) Prepare() error { +func (c *Email) Prepare(ctx context.Context, pl *pool.Pool) error { c.servicesSet = makeSet(c.Services) c.log = log.New(os.Stderr, "[email] ", log.LstdFlags) c.hostname, _ = os.Hostname() diff --git a/plugins/adp_http.go b/plugins/adp_http.go index 7d1260f..aa3e49b 100644 --- a/plugins/adp_http.go +++ b/plugins/adp_http.go @@ -88,7 +88,7 @@ func (c *Http) OnStopped(ctx context.Context, sv pool.Instance, err error) { func (p *Http) OnFinished(ctx context.Context, sv pool.Instance) {} func (a *Http) Close() error { return nil } -func (c *Http) Prepare() error { +func (c *Http) Prepare(ctx context.Context, pl *pool.Pool) error { c.servicesSet = makeSet(c.Services) c.log = log.New(os.Stderr, "[http] ", log.LstdFlags) if c.Method == "" { diff --git a/plugins/adp_rest.go b/plugins/adp_rest.go new file mode 100644 index 0000000..dcf2a84 --- /dev/null +++ b/plugins/adp_rest.go @@ -0,0 +1,131 @@ +package plugins + +import ( + "context" + "github.com/reddec/monexec/pool" + "github.com/pkg/errors" + "net/http" + "time" + "github.com/gin-gonic/gin" +) + +const restApiStartupCheck = 1 * time.Second + +type RestPlugin struct { + Listen string `yaml:"listen"` + server *http.Server +} + +func (p *RestPlugin) Prepare(ctx context.Context, pl *pool.Pool) error { + + router := gin.Default() + + router.GET("/supervisors", func(gctx *gin.Context) { + var names = make([]string, 0) + for _, sv := range pl.Supervisors() { + names = append(names, sv.Config().Name) + } + gctx.JSON(http.StatusOK, names) + }) + router.GET("/supervisor/:name", func(gctx *gin.Context) { + name := gctx.Param("name") + for _, sv := range pl.Supervisors() { + if sv.Config().Name == name { + gctx.JSON(http.StatusOK, sv.Config()) + return + } + } + gctx.AbortWithStatus(http.StatusNotFound) + }) + router.POST("/supervisor/:name", func(gctx *gin.Context) { + name := gctx.Param("name") + for _, sv := range pl.Supervisors() { + if sv.Config().Name == name { + in := pl.Start(ctx, sv) + gctx.JSON(http.StatusOK, in) + return + } + } + gctx.AbortWithStatus(http.StatusNotFound) + }) + router.GET("/instances", func(gctx *gin.Context) { + var names = make([]string, 0) + for _, sv := range pl.Instances() { + names = append(names, sv.Config().Name) + } + gctx.JSON(http.StatusOK, names) + }) + + router.GET("/instance/:name", func(gctx *gin.Context) { + name := gctx.Param("name") + for _, sv := range pl.Instances() { + if sv.Config().Name == name { + gctx.JSON(http.StatusOK, sv) + return + } + } + gctx.AbortWithStatus(http.StatusNotFound) + }) + + router.POST("/instance/:name", func(gctx *gin.Context) { + name := gctx.Param("name") + for _, sv := range pl.Instances() { + if sv.Config().Name == name { + pl.Stop(sv) + gctx.AbortWithStatus(http.StatusCreated) + return + } + } + gctx.AbortWithStatus(http.StatusNotFound) + }) + + p.server = &http.Server{Addr: p.Listen, Handler: router} + + start := make(chan error, 1) + go func() { + start <- p.server.ListenAndServe() + }() + select { + case err := <-start: + return err + case <-time.After(restApiStartupCheck): + return nil + } +} + +func (p *RestPlugin) OnSpawned(ctx context.Context, sv pool.Instance) {} + +func (p *RestPlugin) OnStarted(ctx context.Context, sv pool.Instance) {} + +func (p *RestPlugin) OnStopped(ctx context.Context, sv pool.Instance, err error) {} + +func (p *RestPlugin) OnFinished(ctx context.Context, sv pool.Instance) {} + +func (p *RestPlugin) MergeFrom(o interface{}) error { + def := defaultRestPlugin() + other := o.(*RestPlugin) + if p.Listen == def.Listen { + p.Listen = other.Listen + } else if other.Listen != def.Listen && other.Listen != p.Listen { + return errors.Errorf("unmatched Rest listen address %v != %v", p.Listen, other.Listen) + } + return nil +} + +func (a *RestPlugin) Close() error { + ctx, closer := context.WithTimeout(context.Background(), 1*time.Second) + defer closer() + return a.server.Shutdown(ctx) +} + +func defaultRestPlugin() *RestPlugin { + return &RestPlugin{ + Listen: "localhost:9900", + } +} + +func init() { + registerPlugin("rest", func(file string) PluginConfigNG { + return defaultRestPlugin() + }) +} diff --git a/plugins/adp_telegram.go b/plugins/adp_telegram.go index a980aa5..1800dd2 100644 --- a/plugins/adp_telegram.go +++ b/plugins/adp_telegram.go @@ -23,7 +23,7 @@ type Telegram struct { hostname string } -func (c *Telegram) Prepare() error { +func (c *Telegram) Prepare(ctx context.Context, pl *pool.Pool) error { c.servicesSet = make(map[string]bool) for _, srv := range c.Services { c.servicesSet[srv] = true diff --git a/plugins/plugin_interface.go b/plugins/plugin_interface.go index 79e9bdb..3f59895 100644 --- a/plugins/plugin_interface.go +++ b/plugins/plugin_interface.go @@ -3,6 +3,7 @@ package plugins import ( "github.com/reddec/monexec/pool" "io" + "context" ) // factories of plugins @@ -30,5 +31,5 @@ type PluginConfigNG interface { // Merge change from other instance. Other is always has same type as original MergeFrom(other interface{}) error // Prepare internal state - Prepare() error + Prepare(ctx context.Context, pl *pool.Pool) error } diff --git a/pool/executable.go b/pool/executable.go index 2fb078d..c25c3d3 100644 --- a/pool/executable.go +++ b/pool/executable.go @@ -137,7 +137,8 @@ func (exe *Executable) run(ctx context.Context) error { } type runnable struct { - Executable *Executable + Executable *Executable `json:"config"` + Running bool `json:"running"` pool *Pool closer func() done chan struct{} @@ -164,6 +165,7 @@ func (rn *runnable) run(ctx context.Context) { rn.pool.OnSpawned(ctx, rn) LOOP: for { + rn.Running = true rn.pool.OnStarted(ctx, rn) err := rn.Executable.run(ctx) if err != nil { @@ -171,6 +173,7 @@ LOOP: } else { rn.Executable.logger().Println("stopped") } + rn.Running = false rn.pool.OnStopped(ctx, rn, err) if restarts != -1 { if restarts <= 0 { diff --git a/sample/sample6_rest.yaml b/sample/sample6_rest.yaml new file mode 100644 index 0000000..56d5828 --- /dev/null +++ b/sample/sample6_rest.yaml @@ -0,0 +1,10 @@ +services: +- label: listener2 + command: /bin/bash + args: + - -c + - nc -l 9001 + stop_timeout: 5s + restart_delay: 1s + restart: 1 +rest: \ No newline at end of file