Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change the stater to include a local flag. #1308

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (h *AppAction) Handle(ctx context.Context, a fleetapi.Action, acker acker.A
return fmt.Errorf("invalid type, expected ActionApp and received %T", a)
}

state := h.coord.State()
state := h.coord.State(false)
unit, ok := findUnitFromInputType(state, action.InputType)
if !ok {
// If the matching action is not found ack the action with the error for action result document
Expand Down
20 changes: 16 additions & 4 deletions internal/pkg/agent/application/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,11 @@ type ErrorReporter interface {
type ConfigManager interface {
Runner

// ActionErrors returns the error channel for actions.
// May return errors for fleet managed agents.
// Will always be empty for stand alone agents.
ActionErrors() <-chan error

// Watch returns the chanel to watch for configuration changes.
Watch() <-chan ConfigChange
}
Expand All @@ -137,7 +142,7 @@ type State struct {
// StateFetcher provides an interface to fetch the current state of the coordinator.
type StateFetcher interface {
// State returns the current state of the coordinator.
State() State
State(bool) State
}

// Coordinator manages the entire state of the Elastic Agent.
Expand All @@ -156,6 +161,7 @@ type Coordinator struct {
runtimeMgrErr error
configMgr ConfigManager
configMgrErr error
actionsErr error
varsMgr VarsManager
varsMgrErr error

Expand Down Expand Up @@ -185,7 +191,8 @@ func New(logger *logger.Logger, agentInfo *info.AgentInfo, specs component.Runti
}

// State returns the current state for the coordinator.
func (c *Coordinator) State() (s State) {
// local indicates if local configMgr errors should be reported as part of the state.
func (c *Coordinator) State(local bool) (s State) {
s.State = c.state.state
s.Message = c.state.message
s.Components = c.runtimeMgr.State()
Expand All @@ -201,9 +208,12 @@ func (c *Coordinator) State() (s State) {
if c.runtimeMgrErr != nil {
s.State = agentclient.Failed
s.Message = c.runtimeMgrErr.Error()
} else if c.configMgrErr != nil {
} else if local && c.configMgrErr != nil {
s.State = agentclient.Failed
s.Message = c.configMgrErr.Error()
} else if c.actionsErr != nil {
s.State = agentclient.Failed
s.Message = c.actionsErr.Error()
} else if c.varsMgrErr != nil {
s.State = agentclient.Failed
s.Message = c.varsMgrErr.Error()
Expand Down Expand Up @@ -427,7 +437,7 @@ func (c *Coordinator) DiagnosticHooks() diagnostics.Hooks {
Description: "current state of running components by the Elastic Agent",
ContentType: "application/yaml",
Hook: func(_ context.Context) []byte {
s := c.State()
s := c.State(true)
o, err := yaml.Marshal(s)
if err != nil {
return []byte(fmt.Sprintf("error: %q", err))
Expand Down Expand Up @@ -507,6 +517,8 @@ func (c *Coordinator) runner(ctx context.Context) error {
c.runtimeMgrErr = runtimeErr
case configErr := <-c.configMgr.Errors():
c.configMgrErr = configErr
case actionsErr := <-c.configMgr.ActionErrors():
c.actionsErr = actionsErr
case varsErr := <-c.varsMgr.Errors():
c.varsMgrErr = varsErr
case change := <-configWatcher.Watch():
Expand Down
2 changes: 1 addition & 1 deletion internal/pkg/agent/application/coordinator/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type LivenessResponse struct {
// Response code is 200 for a healthy agent, and 503 otherwise.
// Response body is a JSON object that contains the agent ID, status, message, and the last status update time.
func (c *Coordinator) ServeHTTP(wr http.ResponseWriter, req *http.Request) {
s := c.State()
s := c.State(true)
lr := LivenessResponse{
ID: c.agentInfo.AgentID(),
Status: s.State.String(),
Expand Down
4 changes: 4 additions & 0 deletions internal/pkg/agent/application/fleet_server_bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,10 @@ func (m *fleetServerBootstrapManager) Errors() <-chan error {
return m.errCh
}

func (m *fleetServerBootstrapManager) ActionErrors() <-chan error {
return nil
}

func (m *fleetServerBootstrapManager) Watch() <-chan coordinator.ConfigChange {
return m.ch
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ func (f *fleetGateway) doExecute(ctx context.Context, bo backoff.Backoff) (*flee
f.errCh <- err
return nil, err
}
f.errCh <- err
continue
}
// Request was successful, return the collected actions.
Expand Down Expand Up @@ -286,7 +287,7 @@ func (f *fleetGateway) execute(ctx context.Context) (*fleetapi.CheckinResponse,
}

// get current state
state := f.stateFetcher.State()
state := f.stateFetcher.State(false)

// convert components into checkin components structure
components := f.convertToCheckinComponents(state.Components)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ func (testAgentInfo) AgentID() string { return "agent-secret" }

type emptyStateFetcher struct{}

func (e *emptyStateFetcher) State() coordinator.State {
func (e *emptyStateFetcher) State(_ bool) coordinator.State {
return coordinator.State{}
}

Expand Down
61 changes: 27 additions & 34 deletions internal/pkg/agent/application/managed_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type managedConfigManager struct {
store storage.Store
stateStore *store.StateStore
actionQueue *queue.ActionQueue
dispatcher *dispatcher.ActionDispatcher
runtime *runtime.Manager
coord *coordinator.Coordinator

Expand Down Expand Up @@ -73,6 +74,11 @@ func newManagedConfigManager(
return nil, fmt.Errorf("unable to initialize action queue: %w", err)
}

actionDispatcher, err := dispatcher.New(log, handlers.NewDefault(log), actionQueue)
if err != nil {
return nil, fmt.Errorf("unable to initialize action dispatcher: %w", err)
}

return &managedConfigManager{
log: log,
agentInfo: agentInfo,
Expand All @@ -81,6 +87,7 @@ func newManagedConfigManager(
store: storeSaver,
stateStore: stateStore,
actionQueue: actionQueue,
dispatcher: actionDispatcher,
runtime: runtime,
ch: make(chan coordinator.ConfigChange),
errCh: make(chan error),
Expand Down Expand Up @@ -108,11 +115,8 @@ func (m *managedConfigManager) Run(ctx context.Context) error {
gatewayCtx, gatewayCancel := context.WithCancel(ctx)
defer gatewayCancel()

// Create the actionDispatcher.
actionDispatcher, policyChanger, err := newManagedActionDispatcher(m, gatewayCancel)
if err != nil {
return err
}
// Initialize the actionDispatcher.
policyChanger := m.initDispatcher(gatewayCancel)

// Create ackers to enqueue/retry failed acks
ack, err := fleet.NewAcker(m.log, m.agentInfo, m.client)
Expand All @@ -139,26 +143,14 @@ func (m *managedConfigManager) Run(ctx context.Context) error {
close(retrierRun)
}()

// Gather errors from the dispatcher and pass to the error channel.
go func() {
for {
select {
case <-ctx.Done():
return
case err := <-actionDispatcher.Errors():
m.errCh <- err // err is one or more failures from dispatching an action
}
}
}()

Comment on lines -142 to -153
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The dispatcher's error channel should be emptied by the coordinator as it's returned by the managed mode's ActionErrors() method

actions := m.stateStore.Actions()
stateRestored := false
if len(actions) > 0 && !m.wasUnenrolled() {
// TODO(ph) We will need an improvement on fleet, if there is an error while dispatching a
// persisted action on disk we should be able to ask Fleet to get the latest configuration.
// But at the moment this is not possible because the policy change was acked.
m.log.Info("restoring current policy from disk")
actionDispatcher.Dispatch(ctx, actionAcker, actions...)
m.dispatcher.Dispatch(ctx, actionAcker, actions...)
stateRestored = true
}

Expand Down Expand Up @@ -221,7 +213,7 @@ func (m *managedConfigManager) Run(ctx context.Context) error {
case <-ctx.Done():
return
case actions := <-gateway.Actions():
actionDispatcher.Dispatch(ctx, actionAcker, actions...)
m.dispatcher.Dispatch(ctx, actionAcker, actions...)
}
}
}()
Expand All @@ -230,6 +222,12 @@ func (m *managedConfigManager) Run(ctx context.Context) error {
return gatewayRunner.Err()
}

// ActionErrors returns the error channel for actions.
// May return errors for fleet managed errors.
func (m *managedConfigManager) ActionErrors() <-chan error {
return m.dispatcher.Errors()
}

func (m *managedConfigManager) Errors() <-chan error {
return m.errCh
}
Expand Down Expand Up @@ -299,12 +297,7 @@ func fleetServerRunning(state runtime.ComponentState) bool {
return false
}

func newManagedActionDispatcher(m *managedConfigManager, canceller context.CancelFunc) (*dispatcher.ActionDispatcher, *handlers.PolicyChange, error) {
actionDispatcher, err := dispatcher.New(m.log, handlers.NewDefault(m.log), m.actionQueue)
if err != nil {
return nil, nil, err
}

func (m *managedConfigManager) initDispatcher(canceller context.CancelFunc) *handlers.PolicyChange {
policyChanger := handlers.NewPolicyChange(
m.log,
m.agentInfo,
Expand All @@ -313,17 +306,17 @@ func newManagedActionDispatcher(m *managedConfigManager, canceller context.Cance
m.ch,
)

actionDispatcher.MustRegister(
m.dispatcher.MustRegister(
&fleetapi.ActionPolicyChange{},
policyChanger,
)

actionDispatcher.MustRegister(
m.dispatcher.MustRegister(
&fleetapi.ActionPolicyReassign{},
handlers.NewPolicyReassign(m.log),
)

actionDispatcher.MustRegister(
m.dispatcher.MustRegister(
&fleetapi.ActionUnenroll{},
handlers.NewUnenroll(
m.log,
Expand All @@ -333,12 +326,12 @@ func newManagedActionDispatcher(m *managedConfigManager, canceller context.Cance
),
)

actionDispatcher.MustRegister(
m.dispatcher.MustRegister(
&fleetapi.ActionUpgrade{},
handlers.NewUpgrade(m.log, m.coord),
)

actionDispatcher.MustRegister(
m.dispatcher.MustRegister(
&fleetapi.ActionSettings{},
handlers.NewSettings(
m.log,
Expand All @@ -347,23 +340,23 @@ func newManagedActionDispatcher(m *managedConfigManager, canceller context.Cance
),
)

actionDispatcher.MustRegister(
m.dispatcher.MustRegister(
&fleetapi.ActionCancel{},
handlers.NewCancel(
m.log,
m.actionQueue,
),
)

actionDispatcher.MustRegister(
m.dispatcher.MustRegister(
&fleetapi.ActionApp{},
handlers.NewAppAction(m.log, m.coord),
)

actionDispatcher.MustRegister(
m.dispatcher.MustRegister(
&fleetapi.ActionUnknown{},
handlers.NewUnknown(m.log),
)

return actionDispatcher, policyChanger, nil
return policyChanger
}
6 changes: 6 additions & 0 deletions internal/pkg/agent/application/once.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ func (o *once) Errors() <-chan error {
return o.errCh
}

// ActionErrors returns the error channel for actions.
// Returns nil channel.
func (o *once) ActionErrors() <-chan error {
return nil
}

func (o *once) Watch() <-chan coordinator.ConfigChange {
return o.ch
}
Expand Down
6 changes: 6 additions & 0 deletions internal/pkg/agent/application/periodic.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ func (p *periodic) Errors() <-chan error {
return p.errCh
}

// ActionErrors returns the error channel for actions.
// Returns nil channel.
func (p *periodic) ActionErrors() <-chan error {
return nil
}

func (p *periodic) Watch() <-chan coordinator.ConfigChange {
return p.ch
}
Expand Down
4 changes: 2 additions & 2 deletions internal/pkg/agent/control/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (s *Server) Version(_ context.Context, _ *cproto.Empty) (*cproto.VersionRes
func (s *Server) State(_ context.Context, _ *cproto.Empty) (*cproto.StateResponse, error) {
var err error

state := s.coord.State()
state := s.coord.State(true)
components := make([]*cproto.ComponentState, 0, len(state.Components))
for _, comp := range state.Components {
units := make([]*cproto.ComponentUnitState, 0, len(comp.State.Units))
Expand Down Expand Up @@ -166,7 +166,7 @@ func (s *Server) Restart(_ context.Context, _ *cproto.Empty) (*cproto.RestartRes
func (s *Server) Upgrade(ctx context.Context, request *cproto.UpgradeRequest) (*cproto.UpgradeResponse, error) {
err := s.coord.Upgrade(ctx, request.Version, request.SourceURI, nil)
if err != nil {
return &cproto.UpgradeResponse{ //nolint:nilerr // returns err as response
return &cproto.UpgradeResponse{
Status: cproto.ActionStatus_FAILURE,
Error: err.Error(),
}, nil
Expand Down