Skip to content

Commit

Permalink
add REST api
Browse files Browse the repository at this point in the history
  • Loading branch information
reddec committed Jun 22, 2018
1 parent e4d36df commit 775fa9e
Show file tree
Hide file tree
Showing 11 changed files with 156 additions and 11 deletions.
6 changes: 3 additions & 3 deletions monexec/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (config *Config) Run(sv *pool.Pool, ctx context.Context) error {
// Initialize plugins
//-- prepare and add all plugins
for pluginName, pluginInstance := range config.loadedPlugins {
err := pluginInstance.Prepare()
err := pluginInstance.Prepare(ctx, sv)
if err != nil {
log.Println("failed prepare plugin", pluginName, "-", err)
} else {
Expand Down Expand Up @@ -131,8 +131,8 @@ func LoadConfig(locations ...string) (*Config, error) {
}

var wrap = description

if reflect.ValueOf(wrap).Type().Kind() == reflect.Slice {
refVal := reflect.ValueOf(wrap)
if wrap != nil && refVal.Type().Kind() == reflect.Slice {
wrap = map[string]interface{}{
"<ITEMS>": description,
}
Expand Down
2 changes: 1 addition & 1 deletion plugins/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (

type MyPlugin struct {}

func (p *MyPlugin) Prepare() error { return nil }
func (p *MyPlugin) Prepare(ctx context.Context, pl *pool.Pool) error { return nil }

func (p *MyPlugin) OnSpawned(ctx context.Context, sv pool.Instance) {}

Expand Down
2 changes: 1 addition & 1 deletion plugins/adp_consul.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func DefaultConsul() ConsulPlugin {
}
}

func (p *ConsulPlugin) Prepare() error {
func (p *ConsulPlugin) Prepare(ctx context.Context, pl *pool.Pool) error {
consulConfig := api.DefaultConfig()
consulConfig.Address = p.URL
consul, err := api.NewClient(consulConfig)
Expand Down
2 changes: 1 addition & 1 deletion plugins/adp_critical.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func (a *Critical) MergeFrom(other interface{}) (error) {
return nil
}

func (a *Critical) Prepare() error {
func (a *Critical) Prepare(ctx context.Context, pl *pool.Pool) error {
return nil
}
func (a *Critical) Close() error { return nil }
Expand Down
2 changes: 1 addition & 1 deletion plugins/adp_email.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (c *Email) OnStopped(ctx context.Context, sv pool.Instance, err error) {

func (p *Email) OnFinished(ctx context.Context, sv pool.Instance) {}

func (c *Email) Prepare() error {
func (c *Email) Prepare(ctx context.Context, pl *pool.Pool) error {
c.servicesSet = makeSet(c.Services)
c.log = log.New(os.Stderr, "[email] ", log.LstdFlags)
c.hostname, _ = os.Hostname()
Expand Down
2 changes: 1 addition & 1 deletion plugins/adp_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (c *Http) OnStopped(ctx context.Context, sv pool.Instance, err error) {

func (p *Http) OnFinished(ctx context.Context, sv pool.Instance) {}
func (a *Http) Close() error { return nil }
func (c *Http) Prepare() error {
func (c *Http) Prepare(ctx context.Context, pl *pool.Pool) error {
c.servicesSet = makeSet(c.Services)
c.log = log.New(os.Stderr, "[http] ", log.LstdFlags)
if c.Method == "" {
Expand Down
131 changes: 131 additions & 0 deletions plugins/adp_rest.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package plugins

import (
"context"
"github.com/reddec/monexec/pool"
"github.com/pkg/errors"
"net/http"
"time"
"github.com/gin-gonic/gin"
)

const restApiStartupCheck = 1 * time.Second

type RestPlugin struct {
Listen string `yaml:"listen"`
server *http.Server
}

func (p *RestPlugin) Prepare(ctx context.Context, pl *pool.Pool) error {

router := gin.Default()

router.GET("/supervisors", func(gctx *gin.Context) {
var names = make([]string, 0)
for _, sv := range pl.Supervisors() {
names = append(names, sv.Config().Name)
}
gctx.JSON(http.StatusOK, names)
})
router.GET("/supervisor/:name", func(gctx *gin.Context) {
name := gctx.Param("name")
for _, sv := range pl.Supervisors() {
if sv.Config().Name == name {
gctx.JSON(http.StatusOK, sv.Config())
return
}
}
gctx.AbortWithStatus(http.StatusNotFound)
})
router.POST("/supervisor/:name", func(gctx *gin.Context) {
name := gctx.Param("name")
for _, sv := range pl.Supervisors() {
if sv.Config().Name == name {
in := pl.Start(ctx, sv)
gctx.JSON(http.StatusOK, in)
return
}
}
gctx.AbortWithStatus(http.StatusNotFound)
})
router.GET("/instances", func(gctx *gin.Context) {
var names = make([]string, 0)
for _, sv := range pl.Instances() {
names = append(names, sv.Config().Name)
}
gctx.JSON(http.StatusOK, names)
})

router.GET("/instance/:name", func(gctx *gin.Context) {
name := gctx.Param("name")
for _, sv := range pl.Instances() {
if sv.Config().Name == name {
gctx.JSON(http.StatusOK, sv)
return
}
}
gctx.AbortWithStatus(http.StatusNotFound)
})

router.POST("/instance/:name", func(gctx *gin.Context) {
name := gctx.Param("name")
for _, sv := range pl.Instances() {
if sv.Config().Name == name {
pl.Stop(sv)
gctx.AbortWithStatus(http.StatusCreated)
return
}
}
gctx.AbortWithStatus(http.StatusNotFound)
})

p.server = &http.Server{Addr: p.Listen, Handler: router}

start := make(chan error, 1)
go func() {
start <- p.server.ListenAndServe()
}()
select {
case err := <-start:
return err
case <-time.After(restApiStartupCheck):
return nil
}
}

func (p *RestPlugin) OnSpawned(ctx context.Context, sv pool.Instance) {}

func (p *RestPlugin) OnStarted(ctx context.Context, sv pool.Instance) {}

func (p *RestPlugin) OnStopped(ctx context.Context, sv pool.Instance, err error) {}

func (p *RestPlugin) OnFinished(ctx context.Context, sv pool.Instance) {}

func (p *RestPlugin) MergeFrom(o interface{}) error {
def := defaultRestPlugin()
other := o.(*RestPlugin)
if p.Listen == def.Listen {
p.Listen = other.Listen
} else if other.Listen != def.Listen && other.Listen != p.Listen {
return errors.Errorf("unmatched Rest listen address %v != %v", p.Listen, other.Listen)
}
return nil
}

func (a *RestPlugin) Close() error {
ctx, closer := context.WithTimeout(context.Background(), 1*time.Second)
defer closer()
return a.server.Shutdown(ctx)
}

func defaultRestPlugin() *RestPlugin {
return &RestPlugin{
Listen: "localhost:9900",
}
}

func init() {
registerPlugin("rest", func(file string) PluginConfigNG {
return defaultRestPlugin()
})
}
2 changes: 1 addition & 1 deletion plugins/adp_telegram.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type Telegram struct {
hostname string
}

func (c *Telegram) Prepare() error {
func (c *Telegram) Prepare(ctx context.Context, pl *pool.Pool) error {
c.servicesSet = make(map[string]bool)
for _, srv := range c.Services {
c.servicesSet[srv] = true
Expand Down
3 changes: 2 additions & 1 deletion plugins/plugin_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package plugins
import (
"github.com/reddec/monexec/pool"
"io"
"context"
)

// factories of plugins
Expand Down Expand Up @@ -30,5 +31,5 @@ type PluginConfigNG interface {
// Merge change from other instance. Other is always has same type as original
MergeFrom(other interface{}) error
// Prepare internal state
Prepare() error
Prepare(ctx context.Context, pl *pool.Pool) error
}
5 changes: 4 additions & 1 deletion pool/executable.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ func (exe *Executable) run(ctx context.Context) error {
}

type runnable struct {
Executable *Executable
Executable *Executable `json:"config"`
Running bool `json:"running"`
pool *Pool
closer func()
done chan struct{}
Expand All @@ -164,13 +165,15 @@ func (rn *runnable) run(ctx context.Context) {
rn.pool.OnSpawned(ctx, rn)
LOOP:
for {
rn.Running = true
rn.pool.OnStarted(ctx, rn)
err := rn.Executable.run(ctx)
if err != nil {
rn.Executable.logger().Println("stopped with error:", err)
} else {
rn.Executable.logger().Println("stopped")
}
rn.Running = false
rn.pool.OnStopped(ctx, rn, err)
if restarts != -1 {
if restarts <= 0 {
Expand Down
10 changes: 10 additions & 0 deletions sample/sample6_rest.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
services:
- label: listener2
command: /bin/bash
args:
- -c
- nc -l 9001
stop_timeout: 5s
restart_delay: 1s
restart: 1
rest:

0 comments on commit 775fa9e

Please sign in to comment.