Skip to content

Commit

Permalink
Delay the restart of application when a status report of failure is g…
Browse files Browse the repository at this point in the history
…iven (#25339)

* Delay the restart of application when a status report of failure is given.

* Add changelog.

* Fix test and make it configurable.

* Run mage check

(cherry picked from commit 371871e)
  • Loading branch information
blakerouse authored and mergify-bot committed Apr 28, 2021
1 parent 86c4631 commit 0df11af
Show file tree
Hide file tree
Showing 6 changed files with 80 additions and 24 deletions.
2 changes: 2 additions & 0 deletions x-pack/elastic-agent/CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,10 @@
- Fixed: limit for retries to Kibana configurable {issue}25063[25063]
- Fix issue with status and inspect inside of container {pull}25204[25204]
- Remove FLEET_SERVER_POLICY_NAME env variable as it was not used {pull}25149[25149]
- Reduce log level for listener cleanup to debug {pull}25274
- Passing in policy id to container command works {pull}25352[25352]
- Reduce log level for listener cleanup to debug {pull}25274[25274]
- Delay the restart of application when a status report of failure is given {pull}25339[25339]

==== New features

Expand Down
7 changes: 0 additions & 7 deletions x-pack/elastic-agent/pkg/agent/cmd/enroll_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -612,13 +612,6 @@ func waitForFleetServer(ctx context.Context, agentSubproc <-chan *os.ProcessStat
}
resChan <- waitResult{enrollmentToken: token}
break
} else if app.Status == proto.Status_FAILED {
// app completely failed; exit now
if app.Message != "" {
log.Infof("Fleet Server - %s", app.Message)
}
resChan <- waitResult{err: errors.New(app.Message)}
break
}
if app.Message != "" {
appMsg := fmt.Sprintf("Fleet Server - %s", app.Message)
Expand Down
4 changes: 3 additions & 1 deletion x-pack/elastic-agent/pkg/agent/operation/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ func getTestOperator(t *testing.T, downloadPath string, installPath string, p *a
Delay: 3 * time.Second,
MaxDelay: 10 * time.Second,
},
ProcessConfig: &process.Config{},
ProcessConfig: &process.Config{
FailureTimeout: 1, // restart instantly
},
DownloadConfig: &artifact.Config{
TargetDirectory: downloadPath,
InstallPath: installPath,
Expand Down
4 changes: 3 additions & 1 deletion x-pack/elastic-agent/pkg/core/plugin/process/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,9 @@ type Application struct {

logger *logger.Logger

appLock sync.Mutex
appLock sync.Mutex
restartCanceller context.CancelFunc
restartConfig map[string]interface{}
}

// ArgsDecorator decorates arguments before calling an application
Expand Down
77 changes: 66 additions & 11 deletions x-pack/elastic-agent/pkg/core/plugin/process/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
package process

import (
"context"
"fmt"
"time"

"gopkg.in/yaml.v2"

Expand Down Expand Up @@ -35,21 +37,74 @@ func (a *Application) OnStatusChange(s *server.ApplicationState, status proto.St
return
}

// kill the process
if a.state.ProcessInfo != nil {
_ = a.state.ProcessInfo.Process.Kill()
a.state.ProcessInfo = nil
}
ctx := a.startContext
tag := a.tag

// it was marshalled to pass into the state, so unmarshall will always succeed
var cfg map[string]interface{}
_ = yaml.Unmarshal([]byte(s.Config()), &cfg)

err := a.start(ctx, tag, cfg)
if err != nil {
a.setState(state.Crashed, fmt.Sprintf("failed to restart: %s", err), nil)
// start the failed timer
a.startFailedTimer(cfg)
} else {
a.stopFailedTimer()
}
}

// startFailedTimer starts a timer that will restart the application if it doesn't exit failed after a period of time.
//
// This does not grab the appLock, that must be managed by the caller.
func (a *Application) startFailedTimer(cfg map[string]interface{}) {
if a.restartCanceller != nil {
// already have running failed timer; just update config
a.restartConfig = cfg
return
}

ctx, cancel := context.WithCancel(a.startContext)
a.restartCanceller = cancel
a.restartConfig = cfg
t := time.NewTimer(a.processConfig.FailureTimeout)
go func() {
defer func() {
a.appLock.Lock()
a.restartCanceller = nil
a.restartConfig = nil
a.appLock.Unlock()
}()

select {
case <-ctx.Done():
return
case <-t.C:
a.restart(a.restartConfig)
}
}()
}

// stopFailedTimer stops the timer that would restart the application from reporting failure.
//
// This does not grab the appLock, that must be managed by the caller.
func (a *Application) stopFailedTimer() {
if a.restartCanceller == nil {
return
}
a.restartCanceller()
a.restartCanceller = nil
}

// restart restarts the application
func (a *Application) restart(cfg map[string]interface{}) {
a.appLock.Lock()
defer a.appLock.Unlock()

// kill the process
if a.state.ProcessInfo != nil {
_ = a.state.ProcessInfo.Process.Kill()
a.state.ProcessInfo = nil
}
ctx := a.startContext
tag := a.tag

err := a.start(ctx, tag, cfg)
if err != nil {
a.setState(state.Crashed, fmt.Sprintf("failed to restart: %s", err), nil)
}
}
10 changes: 6 additions & 4 deletions x-pack/elastic-agent/pkg/core/process/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,18 @@ import "time"

// Config for fine tuning new process
type Config struct {
SpawnTimeout time.Duration `yaml:"spawn_timeout" config:"spawn_timeout"`
StopTimeout time.Duration `yaml:"stop_timeout" config:"stop_timeout"`
SpawnTimeout time.Duration `yaml:"spawn_timeout" config:"spawn_timeout"`
StopTimeout time.Duration `yaml:"stop_timeout" config:"stop_timeout"`
FailureTimeout time.Duration `yaml:"failure_timeout" config:"failure_timeout"`

// TODO: cgroups and namespaces
}

// DefaultConfig creates a config with pre-set default values.
func DefaultConfig() *Config {
return &Config{
SpawnTimeout: 30 * time.Second,
StopTimeout: 30 * time.Second,
SpawnTimeout: 30 * time.Second,
StopTimeout: 30 * time.Second,
FailureTimeout: 10 * time.Second,
}
}

0 comments on commit 0df11af

Please sign in to comment.