Skip to content
This repository has been archived by the owner on Jan 21, 2020. It is now read-only.

Clean shutdown of infrakit plugin start --wait #379

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 71 additions & 3 deletions cmd/cli/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"strconv"
"sync"
"syscall"
"time"

log "github.com/Sirupsen/logrus"
"github.com/docker/infrakit/pkg/discovery"
Expand Down Expand Up @@ -104,13 +105,34 @@ func pluginCommand(plugins func() discovery.Plugins) *cobra.Command {
input = append(input, ch)
}

var wait sync.WaitGroup
// This is the channel to send signal that plugins are stopped out of band so stop waiting.
noRunningPlugins := make(chan struct{})
// This is the channel for completion of waiting.
waitDone := make(chan struct{})
// This is the channel to stop scanning for running plugins.
pluginScanDone := make(chan struct{})

var wait sync.WaitGroup
if *doWait {
wait.Add(1)
go func() {
wait.Wait() // wait for everyone to complete
close(waitDone)
}()
}

// Now start all the plugins
started := []string{}

// We do a count of the plugins running before we start.
var before, after = 0, 0

if m, err := plugins().List(); err != nil {
log.Warningln("Problem listing current plugins:", err, "continue.")
} else {
before = len(m)
}

// now start all the plugins
for _, pluginToStart := range args {
fmt.Println("Starting up", pluginToStart)

Expand All @@ -123,6 +145,8 @@ func pluginCommand(plugins func() discovery.Plugins) *cobra.Command {
Plugin: plugin.Name(name),
Started: func(config *types.Any) {
fmt.Println(name, "started.")

started = append(started, name)
wait.Done()
},
Error: func(config *types.Any, err error) {
Expand All @@ -133,11 +157,55 @@ func pluginCommand(plugins func() discovery.Plugins) *cobra.Command {
}
}

wait.Wait() // wait for everyone to complete
if m, err := plugins().List(); err == nil {
after = len(m)
}

// Here we scan the plugins. If we are starting up the plugins, wait a little bit
// for them to show up. Then we start scanning to see if the sockets are gone.
// If the sockets are gone, then we can safely exit.
if *doWait {
go func() {
interval := 5 * time.Second

now := after
if now <= before {
// Here we have fewer plugins running then before. Wait a bit
time.Sleep(interval)
}
checkNow := time.Tick(interval)
for {
select {
case <-pluginScanDone:
log.Infoln("--wait mode: stop scanning.")
return

case <-checkNow:
if m, err := plugins().List(); err == nil {
now = len(m)
}
if now == 0 {
log.Infoln("--wait mode: scan found no plugins.")
close(noRunningPlugins)
}
}
}
}()
}

// Here we wait for either wait group to be done or if they are killed out of band.
select {
case <-waitDone:
log.Infoln("All plugins completed. Exiting.")
case <-noRunningPlugins:
log.Infoln("Plugins aren't running anymore. Exiting.")
}

for _, monitor := range monitors {
monitor.Stop()
}

close(pluginScanDone)
return nil
}

Expand Down