Skip to content

Commit

Permalink
Rename to management.Manager, add UpdateStatus to Manager interface. (e…
Browse files Browse the repository at this point in the history
…lastic#19114)

* Rename management.ConfigManager to management.Manager, add UpdateStatus to Manager interface.

* Update docstring for Failed status.

* Add to developer changelog.

* Add StatusReporter interface, wrap client.Status in lock. Add tests for statusToProtoStatus.
  • Loading branch information
blakerouse authored and melchiormoulin committed Oct 14, 2020
1 parent 1a9e35c commit a146cac
Show file tree
Hide file tree
Showing 10 changed files with 149 additions and 36 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG-developer.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ The list below covers the major changes between 7.0.0-rc2 and master only.
Your magefile.go will require a change to adapt the devtool API. See the pull request for
more details. {pull}18148[18148]
- The Elasticsearch client settings expect the API key to be raw (not base64-encoded). {issue}18939[18939] {pull}18945[18945]
- `management.ConfigManager` has been renamed to `management.Manager`. {pull}19114[19114]
- `UpdateStatus` has been added to the `management.Manager` interface. {pull}19114[19114]

==== Bugfixes

Expand Down
2 changes: 1 addition & 1 deletion filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) {
haveEnabledInputs = true
}

if !config.ConfigInput.Enabled() && !config.ConfigModules.Enabled() && !haveEnabledInputs && config.Autodiscover == nil && !b.ConfigManager.Enabled() {
if !config.ConfigInput.Enabled() && !config.ConfigModules.Enabled() && !haveEnabledInputs && config.Autodiscover == nil && !b.Manager.Enabled() {
if !b.InSetupCmd {
return nil, errors.New("no modules or inputs enabled and configuration reloading disabled. What files do you want me to watch?")
}
Expand Down
2 changes: 1 addition & 1 deletion heartbeat/beater/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (bt *Heartbeat) Run(b *beat.Beat) error {
return err
}

if b.ConfigManager.Enabled() {
if b.Manager.Enabled() {
bt.RunCentralMgmtMonitors(b)
}

Expand Down
2 changes: 1 addition & 1 deletion libbeat/beat/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ type Beat struct {

Fields []byte // Data from fields.yml

ConfigManager management.ConfigManager // config manager
Manager management.Manager // manager

Keystore keystore.Keystore
}
Expand Down
12 changes: 6 additions & 6 deletions libbeat/cmd/instance/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,12 +330,12 @@ func (b *Beat) createBeater(bt beat.Creator) (beat.Beater, error) {

// Report central management state
mgmt := monitoring.GetNamespace("state").GetRegistry().NewRegistry("management")
monitoring.NewBool(mgmt, "enabled").Set(b.ConfigManager.Enabled())
monitoring.NewBool(mgmt, "enabled").Set(b.Manager.Enabled())

debugf("Initializing output plugins")
outputEnabled := b.Config.Output.IsSet() && b.Config.Output.Config().Enabled()
if !outputEnabled {
if b.ConfigManager.Enabled() {
if b.Manager.Enabled() {
logp.Info("Output is configured through Central Management")
} else {
msg := "No outputs are defined. Please define one under the output section."
Expand Down Expand Up @@ -462,8 +462,8 @@ func (b *Beat) launch(settings Settings, bt beat.Creator) error {
logp.Info("%s start running.", b.Info.Beat)

// Launch config manager
b.ConfigManager.Start(beater.Stop)
defer b.ConfigManager.Stop()
b.Manager.Start(beater.Stop)
defer b.Manager.Stop()

return beater.Run(&b.Beat)
}
Expand Down Expand Up @@ -643,12 +643,12 @@ func (b *Beat) configure(settings Settings) error {
logp.Info("Beat ID: %v", b.Info.ID)

// initialize config manager
b.ConfigManager, err = management.Factory(b.Config.Management)(b.Config.Management, reload.Register, b.Beat.Info.ID)
b.Manager, err = management.Factory(b.Config.Management)(b.Config.Management, reload.Register, b.Beat.Info.ID)
if err != nil {
return err
}

if err := b.ConfigManager.CheckRawConfig(b.RawConfig); err != nil {
if err := b.Manager.CheckRawConfig(b.RawConfig); err != nil {
return err
}

Expand Down
85 changes: 69 additions & 16 deletions libbeat/management/management.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,36 @@
package management

import (
"sync"

"github.com/gofrs/uuid"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/reload"
"github.com/elastic/beats/v7/libbeat/feature"
"github.com/elastic/beats/v7/libbeat/logp"
)

// Status describes the current status of the beat.
type Status int

const (
// Unknown is initial status when none has been reported.
Unknown Status = iota
// Starting is status describing application is starting.
Starting
// Configuring is status describing application is configuring.
Configuring
// Running is status describing application is running.
Running
// Degraded is status describing application is degraded.
Degraded
// Failed is status describing application is failed. This status should
// only be used in the case the beat should stop running as the failure
// cannot be recovered.
Failed
// Stopping is status describing application is stopping.
Stopping
)

// Namespace is the feature namespace for queue definition.
Expand All @@ -33,27 +58,36 @@ var DebugK = "centralmgmt"

var centralMgmtKey = "x-pack-cm"

// ConfigManager interacts with the beat to update configurations
// from an external source
type ConfigManager interface {
// Enabled returns true if config manager is enabled
// StatusReporter provides a method to update current status of the beat.
type StatusReporter interface {
// UpdateStatus called when the status of the beat has changed.
UpdateStatus(status Status, msg string)
}

// Manager interacts with the beat to provide status updates and to receive
// configurations.
type Manager interface {
StatusReporter

// Enabled returns true if manager is enabled.
Enabled() bool

// Start the config manager
Start(func())
// Start the config manager giving it a stopFunc callback
// so the beat can be told when to stop.
Start(stopFunc func())

// Stop the config manager
// Stop the config manager.
Stop()

// CheckRawConfig check settings are correct before launching the beat
// CheckRawConfig check settings are correct before launching the beat.
CheckRawConfig(cfg *common.Config) error
}

// PluginFunc for creating FactoryFunc if it matches a config
type PluginFunc func(*common.Config) FactoryFunc

// FactoryFunc for creating a config manager
type FactoryFunc func(*common.Config, *reload.Registry, uuid.UUID) (ConfigManager, error)
type FactoryFunc func(*common.Config, *reload.Registry, uuid.UUID) (Manager, error)

// Register a config manager
func Register(name string, fn PluginFunc, stability feature.Stability) {
Expand Down Expand Up @@ -91,13 +125,32 @@ func defaultModeConfig() *modeConfig {
}

// nilManager, fallback when no manager is present
type nilManager struct{}
type nilManager struct {
logger *logp.Logger
lock sync.Mutex
status Status
msg string
}

func nilFactory(*common.Config, *reload.Registry, uuid.UUID) (ConfigManager, error) {
return nilManager{}, nil
func nilFactory(*common.Config, *reload.Registry, uuid.UUID) (Manager, error) {
log := logp.NewLogger("mgmt")
return &nilManager{
logger: log,
status: Unknown,
msg: "",
}, nil
}

func (nilManager) Enabled() bool { return false }
func (nilManager) Start(_ func()) {}
func (nilManager) Stop() {}
func (nilManager) CheckRawConfig(cfg *common.Config) error { return nil }
func (*nilManager) Enabled() bool { return false }
func (*nilManager) Start(_ func()) {}
func (*nilManager) Stop() {}
func (*nilManager) CheckRawConfig(cfg *common.Config) error { return nil }
func (n *nilManager) UpdateStatus(status Status, msg string) {
n.lock.Lock()
defer n.lock.Unlock()
if n.status != status || n.msg != msg {
n.status = status
n.msg = msg
n.logger.Infof("Status change to %s: %s", status, msg)
}
}
2 changes: 1 addition & 1 deletion metricbeat/beater/metricbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func newMetricbeat(b *beat.Beat, c *common.Config, options ...Option) (*Metricbe
return nil, errors.Wrap(err, "error reading configuration file")
}

dynamicCfgEnabled := config.ConfigModules.Enabled() || config.Autodiscover != nil || b.ConfigManager.Enabled()
dynamicCfgEnabled := config.ConfigModules.Enabled() || config.Autodiscover != nil || b.Manager.Enabled()
if !dynamicCfgEnabled && len(config.Modules) == 0 {
return nil, mb.ErrEmptyConfig
}
Expand Down
53 changes: 46 additions & 7 deletions x-pack/libbeat/management/fleet/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"fmt"
"os"
"sort"
"sync"

"github.com/gofrs/uuid"
"github.com/pkg/errors"
Expand All @@ -35,12 +36,15 @@ type Manager struct {
registry *reload.Registry
blacklist *xmanagement.ConfigBlacklist
client *client.Client
lock sync.Mutex
status management.Status
msg string

stopFunc func()
}

// NewFleetManager returns a X-Pack Beats Fleet Management manager.
func NewFleetManager(config *common.Config, registry *reload.Registry, beatUUID uuid.UUID) (management.ConfigManager, error) {
func NewFleetManager(config *common.Config, registry *reload.Registry, beatUUID uuid.UUID) (management.Manager, error) {
c := defaultConfig()
if config.Enabled() {
if err := config.Unpack(&c); err != nil {
Expand All @@ -51,7 +55,7 @@ func NewFleetManager(config *common.Config, registry *reload.Registry, beatUUID
}

// NewFleetManagerWithConfig returns a X-Pack Beats Fleet Management manager.
func NewFleetManagerWithConfig(c *Config, registry *reload.Registry, beatUUID uuid.UUID) (management.ConfigManager, error) {
func NewFleetManagerWithConfig(c *Config, registry *reload.Registry, beatUUID uuid.UUID) (management.Manager, error) {
log := logp.NewLogger(management.DebugK)

m := &Manager{
Expand Down Expand Up @@ -122,38 +126,51 @@ func (cm *Manager) CheckRawConfig(cfg *common.Config) error {
return nil
}

// UpdateStatus updates the manager with the current status for the beat.
func (cm *Manager) UpdateStatus(status management.Status, msg string) {
cm.lock.Lock()
defer cm.lock.Unlock()

if cm.status != status || cm.msg != msg {
cm.status = status
cm.msg = msg
cm.client.Status(statusToProtoStatus(status), msg)
cm.logger.Infof("Status change to %s: %s", status, msg)
}
}

func (cm *Manager) OnConfig(s string) {
cm.client.Status(proto.StateObserved_CONFIGURING, "Updating configuration")
cm.UpdateStatus(management.Configuring, "Updating configuration")

var configMap common.MapStr
uconfig, err := common.NewConfigFrom(s)
if err != nil {
err = errors.Wrap(err, "config blocks unsuccessfully generated")
cm.logger.Error(err)
cm.client.Status(proto.StateObserved_FAILED, err.Error())
cm.UpdateStatus(management.Failed, err.Error())
return
}

err = uconfig.Unpack(&configMap)
if err != nil {
err = errors.Wrap(err, "config blocks unsuccessfully generated")
cm.logger.Error(err)
cm.client.Status(proto.StateObserved_FAILED, err.Error())
cm.UpdateStatus(management.Failed, err.Error())
return
}

blocks, err := cm.toConfigBlocks(configMap)
if err != nil {
err = errors.Wrap(err, "could not apply the configuration")
cm.logger.Error(err)
cm.client.Status(proto.StateObserved_FAILED, err.Error())
cm.UpdateStatus(management.Failed, err.Error())
return
}

if errs := cm.apply(blocks); !errs.IsEmpty() {
err = errors.Wrap(err, "could not apply the configuration")
cm.logger.Error(err)
cm.client.Status(proto.StateObserved_FAILED, err.Error())
cm.UpdateStatus(management.Failed, err.Error())
return
}

Expand Down Expand Up @@ -285,3 +302,25 @@ func (cm *Manager) toConfigBlocks(cfg common.MapStr) (api.ConfigBlocks, error) {

return res, nil
}

func statusToProtoStatus(status management.Status) proto.StateObserved_Status {
switch status {
case management.Unknown:
// unknown is reported as healthy, as the status is unknown
return proto.StateObserved_HEALTHY
case management.Starting:
return proto.StateObserved_STARTING
case management.Configuring:
return proto.StateObserved_CONFIGURING
case management.Running:
return proto.StateObserved_HEALTHY
case management.Degraded:
return proto.StateObserved_DEGRADED
case management.Failed:
return proto.StateObserved_FAILED
case management.Stopping:
return proto.StateObserved_STOPPING
}
// unknown status, still reported as healthy
return proto.StateObserved_HEALTHY
}
16 changes: 15 additions & 1 deletion x-pack/libbeat/management/fleet/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,13 @@ package fleet
import (
"testing"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/stretchr/testify/assert"

"github.com/elastic/elastic-agent-client/v7/pkg/proto"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/reload"
"github.com/elastic/beats/v7/libbeat/management"
)

func TestConfigBlocks(t *testing.T) {
Expand Down Expand Up @@ -53,6 +57,16 @@ output:
}
}

func TestStatusToProtoStatus(t *testing.T) {
assert.Equal(t, proto.StateObserved_HEALTHY, statusToProtoStatus(management.Unknown))
assert.Equal(t, proto.StateObserved_STARTING, statusToProtoStatus(management.Starting))
assert.Equal(t, proto.StateObserved_CONFIGURING, statusToProtoStatus(management.Configuring))
assert.Equal(t, proto.StateObserved_HEALTHY, statusToProtoStatus(management.Running))
assert.Equal(t, proto.StateObserved_DEGRADED, statusToProtoStatus(management.Degraded))
assert.Equal(t, proto.StateObserved_FAILED, statusToProtoStatus(management.Failed))
assert.Equal(t, proto.StateObserved_STOPPING, statusToProtoStatus(management.Stopping))
}

type dummyReloadable struct{}

func (dummyReloadable) Reload(config *reload.ConfigWithMeta) error {
Expand Down
9 changes: 7 additions & 2 deletions x-pack/libbeat/management/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type ConfigManager struct {
}

// NewConfigManager returns a X-Pack Beats Central Management manager
func NewConfigManager(config *common.Config, registry *reload.Registry, beatUUID uuid.UUID) (management.ConfigManager, error) {
func NewConfigManager(config *common.Config, registry *reload.Registry, beatUUID uuid.UUID) (management.Manager, error) {
c := defaultConfig()
if config.Enabled() {
if err := config.Unpack(&c); err != nil {
Expand All @@ -54,7 +54,7 @@ func NewConfigManager(config *common.Config, registry *reload.Registry, beatUUID
}

// NewConfigManagerWithConfig returns a X-Pack Beats Central Management manager
func NewConfigManagerWithConfig(c *Config, registry *reload.Registry, beatUUID uuid.UUID) (management.ConfigManager, error) {
func NewConfigManagerWithConfig(c *Config, registry *reload.Registry, beatUUID uuid.UUID) (management.Manager, error) {
var client *api.Client
var cache *Cache
var blacklist *ConfigBlacklist
Expand Down Expand Up @@ -152,6 +152,11 @@ func (cm *ConfigManager) CheckRawConfig(cfg *common.Config) error {
return nil
}

// UpdateStatus updates the manager with the current status for the beat.
func (cm *ConfigManager) UpdateStatus(_ management.Status, _ string) {
// do nothing; no longer under development and has been deprecated
}

func (cm *ConfigManager) worker() {
defer cm.wg.Done()

Expand Down

0 comments on commit a146cac

Please sign in to comment.