Skip to content

Commit

Permalink
Change the stater to include a local flag. (#1308)
Browse files Browse the repository at this point in the history
* Change the stater to include a local flag.

Change the state reporter to use a local flag that determines if local
errors are included in the resulting state. Assume that configMgr errors
are all local - this effects mainly the fleet_gateway. Allow the gateway
to report an error if a checkin fails. When a checkin fails the local
state reported through the status command and liveness endpoint will
include the error, but checkins to fleet-server will not.

* Add ActionsError() method to config manager

Add a new ActionsError() methdo the the config managers. For the
non-managed instances it will return a nil channel. For the managed
instances it will return the dispatcher error queue directly. Have teh
coordinator gather from this channel as it does for the others and
treat any errors as non-local.

* Fix linter
  • Loading branch information
michel-laterman authored Oct 24, 2022
1 parent 9420497 commit 96e071e
Show file tree
Hide file tree
Showing 10 changed files with 66 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (h *AppAction) Handle(ctx context.Context, a fleetapi.Action, acker acker.A
return fmt.Errorf("invalid type, expected ActionApp and received %T", a)
}

state := h.coord.State()
state := h.coord.State(false)
unit, ok := findUnitFromInputType(state, action.InputType)
if !ok {
// If the matching action is not found ack the action with the error for action result document
Expand Down
20 changes: 16 additions & 4 deletions internal/pkg/agent/application/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,11 @@ type ErrorReporter interface {
type ConfigManager interface {
Runner

// ActionErrors returns the error channel for actions.
// May return errors for fleet managed agents.
// Will always be empty for stand alone agents.
ActionErrors() <-chan error

// Watch returns the chanel to watch for configuration changes.
Watch() <-chan ConfigChange
}
Expand All @@ -149,7 +154,7 @@ type State struct {
// StateFetcher provides an interface to fetch the current state of the coordinator.
type StateFetcher interface {
// State returns the current state of the coordinator.
State() State
State(bool) State
}

// Coordinator manages the entire state of the Elastic Agent.
Expand All @@ -169,6 +174,7 @@ type Coordinator struct {
runtimeMgrErr error
configMgr ConfigManager
configMgrErr error
actionsErr error
varsMgr VarsManager
varsMgrErr error

Expand Down Expand Up @@ -199,7 +205,8 @@ func New(logger *logger.Logger, agentInfo *info.AgentInfo, specs component.Runti
}

// State returns the current state for the coordinator.
func (c *Coordinator) State() (s State) {
// local indicates if local configMgr errors should be reported as part of the state.
func (c *Coordinator) State(local bool) (s State) {
s.State = c.state.state
s.Message = c.state.message
s.Components = c.runtimeMgr.State()
Expand All @@ -215,9 +222,12 @@ func (c *Coordinator) State() (s State) {
if c.runtimeMgrErr != nil {
s.State = agentclient.Failed
s.Message = c.runtimeMgrErr.Error()
} else if c.configMgrErr != nil {
} else if local && c.configMgrErr != nil {
s.State = agentclient.Failed
s.Message = c.configMgrErr.Error()
} else if c.actionsErr != nil {
s.State = agentclient.Failed
s.Message = c.actionsErr.Error()
} else if c.varsMgrErr != nil {
s.State = agentclient.Failed
s.Message = c.varsMgrErr.Error()
Expand Down Expand Up @@ -441,7 +451,7 @@ func (c *Coordinator) DiagnosticHooks() diagnostics.Hooks {
Description: "current state of running components by the Elastic Agent",
ContentType: "application/yaml",
Hook: func(_ context.Context) []byte {
s := c.State()
s := c.State(true)
o, err := yaml.Marshal(s)
if err != nil {
return []byte(fmt.Sprintf("error: %q", err))
Expand Down Expand Up @@ -521,6 +531,8 @@ func (c *Coordinator) runner(ctx context.Context) error {
c.runtimeMgrErr = runtimeErr
case configErr := <-c.configMgr.Errors():
c.configMgrErr = configErr
case actionsErr := <-c.configMgr.ActionErrors():
c.actionsErr = actionsErr
case varsErr := <-c.varsMgr.Errors():
c.varsMgrErr = varsErr
case change := <-configWatcher.Watch():
Expand Down
2 changes: 1 addition & 1 deletion internal/pkg/agent/application/coordinator/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type LivenessResponse struct {
// Response code is 200 for a healthy agent, and 503 otherwise.
// Response body is a JSON object that contains the agent ID, status, message, and the last status update time.
func (c *Coordinator) ServeHTTP(wr http.ResponseWriter, req *http.Request) {
s := c.State()
s := c.State(true)
lr := LivenessResponse{
ID: c.agentInfo.AgentID(),
Status: s.State.String(),
Expand Down
4 changes: 4 additions & 0 deletions internal/pkg/agent/application/fleet_server_bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,10 @@ func (m *fleetServerBootstrapManager) Errors() <-chan error {
return m.errCh
}

func (m *fleetServerBootstrapManager) ActionErrors() <-chan error {
return nil
}

func (m *fleetServerBootstrapManager) Watch() <-chan coordinator.ConfigChange {
return m.ch
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ func (f *fleetGateway) doExecute(ctx context.Context, bo backoff.Backoff) (*flee
f.errCh <- err
return nil, err
}
f.errCh <- err
continue
}

Expand Down Expand Up @@ -306,7 +307,7 @@ func (f *fleetGateway) execute(ctx context.Context) (*fleetapi.CheckinResponse,
}

// get current state
state := f.stateFetcher.State()
state := f.stateFetcher.State(false)

// convert components into checkin components structure
components := f.convertToCheckinComponents(state.Components)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ func (testAgentInfo) AgentID() string { return "agent-secret" }

type emptyStateFetcher struct{}

func (e *emptyStateFetcher) State() coordinator.State {
func (e *emptyStateFetcher) State(_ bool) coordinator.State {
return coordinator.State{}
}

Expand Down
61 changes: 27 additions & 34 deletions internal/pkg/agent/application/managed_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type managedConfigManager struct {
store storage.Store
stateStore *store.StateStore
actionQueue *queue.ActionQueue
dispatcher *dispatcher.ActionDispatcher
runtime *runtime.Manager
coord *coordinator.Coordinator

Expand Down Expand Up @@ -73,6 +74,11 @@ func newManagedConfigManager(
return nil, fmt.Errorf("unable to initialize action queue: %w", err)
}

actionDispatcher, err := dispatcher.New(log, handlers.NewDefault(log), actionQueue)
if err != nil {
return nil, fmt.Errorf("unable to initialize action dispatcher: %w", err)
}

return &managedConfigManager{
log: log,
agentInfo: agentInfo,
Expand All @@ -81,6 +87,7 @@ func newManagedConfigManager(
store: storeSaver,
stateStore: stateStore,
actionQueue: actionQueue,
dispatcher: actionDispatcher,
runtime: runtime,
ch: make(chan coordinator.ConfigChange),
errCh: make(chan error),
Expand Down Expand Up @@ -108,11 +115,8 @@ func (m *managedConfigManager) Run(ctx context.Context) error {
gatewayCtx, gatewayCancel := context.WithCancel(ctx)
defer gatewayCancel()

// Create the actionDispatcher.
actionDispatcher, policyChanger, err := newManagedActionDispatcher(m, gatewayCancel)
if err != nil {
return err
}
// Initialize the actionDispatcher.
policyChanger := m.initDispatcher(gatewayCancel)

// Create ackers to enqueue/retry failed acks
ack, err := fleet.NewAcker(m.log, m.agentInfo, m.client)
Expand All @@ -139,26 +143,14 @@ func (m *managedConfigManager) Run(ctx context.Context) error {
close(retrierRun)
}()

// Gather errors from the dispatcher and pass to the error channel.
go func() {
for {
select {
case <-ctx.Done():
return
case err := <-actionDispatcher.Errors():
m.errCh <- err // err is one or more failures from dispatching an action
}
}
}()

actions := m.stateStore.Actions()
stateRestored := false
if len(actions) > 0 && !m.wasUnenrolled() {
// TODO(ph) We will need an improvement on fleet, if there is an error while dispatching a
// persisted action on disk we should be able to ask Fleet to get the latest configuration.
// But at the moment this is not possible because the policy change was acked.
m.log.Info("restoring current policy from disk")
actionDispatcher.Dispatch(ctx, actionAcker, actions...)
m.dispatcher.Dispatch(ctx, actionAcker, actions...)
stateRestored = true
}

Expand Down Expand Up @@ -221,7 +213,7 @@ func (m *managedConfigManager) Run(ctx context.Context) error {
case <-ctx.Done():
return
case actions := <-gateway.Actions():
actionDispatcher.Dispatch(ctx, actionAcker, actions...)
m.dispatcher.Dispatch(ctx, actionAcker, actions...)
}
}
}()
Expand All @@ -230,6 +222,12 @@ func (m *managedConfigManager) Run(ctx context.Context) error {
return gatewayRunner.Err()
}

// ActionErrors returns the error channel for actions.
// May return errors for fleet managed errors.
func (m *managedConfigManager) ActionErrors() <-chan error {
return m.dispatcher.Errors()
}

func (m *managedConfigManager) Errors() <-chan error {
return m.errCh
}
Expand Down Expand Up @@ -299,12 +297,7 @@ func fleetServerRunning(state runtime.ComponentState) bool {
return false
}

func newManagedActionDispatcher(m *managedConfigManager, canceller context.CancelFunc) (*dispatcher.ActionDispatcher, *handlers.PolicyChange, error) {
actionDispatcher, err := dispatcher.New(m.log, handlers.NewDefault(m.log), m.actionQueue)
if err != nil {
return nil, nil, err
}

func (m *managedConfigManager) initDispatcher(canceller context.CancelFunc) *handlers.PolicyChange {
policyChanger := handlers.NewPolicyChange(
m.log,
m.agentInfo,
Expand All @@ -313,17 +306,17 @@ func newManagedActionDispatcher(m *managedConfigManager, canceller context.Cance
m.ch,
)

actionDispatcher.MustRegister(
m.dispatcher.MustRegister(
&fleetapi.ActionPolicyChange{},
policyChanger,
)

actionDispatcher.MustRegister(
m.dispatcher.MustRegister(
&fleetapi.ActionPolicyReassign{},
handlers.NewPolicyReassign(m.log),
)

actionDispatcher.MustRegister(
m.dispatcher.MustRegister(
&fleetapi.ActionUnenroll{},
handlers.NewUnenroll(
m.log,
Expand All @@ -333,12 +326,12 @@ func newManagedActionDispatcher(m *managedConfigManager, canceller context.Cance
),
)

actionDispatcher.MustRegister(
m.dispatcher.MustRegister(
&fleetapi.ActionUpgrade{},
handlers.NewUpgrade(m.log, m.coord),
)

actionDispatcher.MustRegister(
m.dispatcher.MustRegister(
&fleetapi.ActionSettings{},
handlers.NewSettings(
m.log,
Expand All @@ -347,23 +340,23 @@ func newManagedActionDispatcher(m *managedConfigManager, canceller context.Cance
),
)

actionDispatcher.MustRegister(
m.dispatcher.MustRegister(
&fleetapi.ActionCancel{},
handlers.NewCancel(
m.log,
m.actionQueue,
),
)

actionDispatcher.MustRegister(
m.dispatcher.MustRegister(
&fleetapi.ActionApp{},
handlers.NewAppAction(m.log, m.coord),
)

actionDispatcher.MustRegister(
m.dispatcher.MustRegister(
&fleetapi.ActionUnknown{},
handlers.NewUnknown(m.log),
)

return actionDispatcher, policyChanger, nil
return policyChanger
}
6 changes: 6 additions & 0 deletions internal/pkg/agent/application/once.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ func (o *once) Errors() <-chan error {
return o.errCh
}

// ActionErrors returns the error channel for actions.
// Returns nil channel.
func (o *once) ActionErrors() <-chan error {
return nil
}

func (o *once) Watch() <-chan coordinator.ConfigChange {
return o.ch
}
Expand Down
6 changes: 6 additions & 0 deletions internal/pkg/agent/application/periodic.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ func (p *periodic) Errors() <-chan error {
return p.errCh
}

// ActionErrors returns the error channel for actions.
// Returns nil channel.
func (p *periodic) ActionErrors() <-chan error {
return nil
}

func (p *periodic) Watch() <-chan coordinator.ConfigChange {
return p.ch
}
Expand Down
4 changes: 2 additions & 2 deletions internal/pkg/agent/control/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (s *Server) Version(_ context.Context, _ *cproto.Empty) (*cproto.VersionRes
func (s *Server) State(_ context.Context, _ *cproto.Empty) (*cproto.StateResponse, error) {
var err error

state := s.coord.State()
state := s.coord.State(true)
components := make([]*cproto.ComponentState, 0, len(state.Components))
for _, comp := range state.Components {
units := make([]*cproto.ComponentUnitState, 0, len(comp.State.Units))
Expand Down Expand Up @@ -166,7 +166,7 @@ func (s *Server) Restart(_ context.Context, _ *cproto.Empty) (*cproto.RestartRes
func (s *Server) Upgrade(ctx context.Context, request *cproto.UpgradeRequest) (*cproto.UpgradeResponse, error) {
err := s.coord.Upgrade(ctx, request.Version, request.SourceURI, nil)
if err != nil {
return &cproto.UpgradeResponse{ //nolint:nilerr // returns err as response
return &cproto.UpgradeResponse{
Status: cproto.ActionStatus_FAILURE,
Error: err.Error(),
}, nil
Expand Down

0 comments on commit 96e071e

Please sign in to comment.