Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix agent shutdown on SIGINT #1258

Merged
merged 11 commits into from
Sep 29, 2022
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/elastic/elastic-agent

go 1.17
go 1.18

require (
github.com/Microsoft/go-winio v0.5.2
Expand Down
6 changes: 3 additions & 3 deletions internal/pkg/agent/application/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ func (c *Coordinator) runner(ctx context.Context) error {

runtimeWatcher := c.runtimeMgr
runtimeRun := make(chan bool)
runtimeErrCh := make(chan error)
runtimeErrCh := make(chan error, 1)
go func(manager Runner) {
err := manager.Run(ctx)
close(runtimeRun)
Expand All @@ -319,7 +319,7 @@ func (c *Coordinator) runner(ctx context.Context) error {

configWatcher := c.configMgr
configRun := make(chan bool)
configErrCh := make(chan error)
configErrCh := make(chan error, 1)
go func(manager Runner) {
err := manager.Run(ctx)
close(configRun)
Expand All @@ -328,7 +328,7 @@ func (c *Coordinator) runner(ctx context.Context) error {

varsWatcher := c.varsMgr
varsRun := make(chan bool)
varsErrCh := make(chan error)
varsErrCh := make(chan error, 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain to me why this is needed? The select/case reads from all these channels, so why would an unbuffered channel be needed here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All the places where the unbuffered channels were changed to buffered are the channels that were blocking on shutdown. So it was either something was writing to that channel that nothing was reading from (exited read loop before the writers were fully stopped) or reading from the channel that nothing was writing to.
This fix is not ideal, I understand. Might have to fix more code in order to avoid buffered channels then.

Copy link
Contributor

@michalpristas michalpristas Sep 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i don't like having/using buffered here. it signals our logic is broken somewhere. we should aim on fixing the real issue not the symptom.
however if we agree that this is sufficient for now add a TODO there at least and create an issue. otherwise we're creating technical debt which we will probably never deal with

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is changed back now, after fixing some underlying components, no longer block on this chans

go func(manager Runner) {
err := manager.Run(ctx)
close(varsRun)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func newFleetGatewayWithScheduler(
acker: acker,
stateFetcher: stateFetcher,
stateStore: stateStore,
errCh: make(chan error),
errCh: make(chan error, 1),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here? The Errors() should be watched by the coordinator, so why does this need to be changed to buffered. What is causing this to be blocking unless you make it buffered.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed managed_mode coordination with fleet gateway. Now the gateway errors reading loop waits until gateway exits. Otherwise if the gateway shuts down out of sequence blocks it can block on errCh.

}, nil
}

Expand Down
2 changes: 1 addition & 1 deletion internal/pkg/fleetapi/acker/retrier/retrier.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func (r *Retrier) Run(ctx context.Context) {
case <-r.kickCh:
r.runRetries(ctx)
case <-ctx.Done():
r.log.Debug("ack retrier: exit on %v", ctx.Err())
r.log.Debugf("ack retrier: exit on %v", ctx.Err())
return
}
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/component/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,10 @@ func (s *componentRuntimeState) destroy() {
if s.watchCanceller != nil {
s.watchCanceller()
s.watchCanceller = nil
<-s.watchChan
// Do not wait on watchChan here
// the watch loop calls stateChanged that calls "destroy" (this function) and will block forever
// since the watch channel will never be closed because the watch loop is blocked on stateChanged call
// <-s.watchChan
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to ensure that we wait for the watcher goroutine does stop. This seems like a hacky way of fixing it, by just commenting it out.

Maybe we could use a different channel to know when its done? Or a waitgroup?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll rewrite how the runners are managed, it currently deadlocks on wait for this channel to be closed while it can't be closed until you return from this function.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is rewritten now, think it's cleaner

}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/component/runtime/runtime_comm.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func newRuntimeComm(logger *logger.Logger, listenAddr string, ca *authority.Cert
token: token.String(),
cert: pair,
checkinConn: true,
checkinExpected: make(chan *proto.CheckinExpected),
checkinExpected: make(chan *proto.CheckinExpected, 1),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the change here? I would prefer to not buffer expected configurations.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so for this one the "command" runtime sends "CheckinExpected" while the beat for example failed to connect over gRPC and the V2 checkin call from the client is never called.
So this loop that reads from the unbuffered channel was never run
https://github.com/elastic/elastic-agent/blob/feature-arch-v2/pkg/component/runtime/runtime_comm.go#L184

and the CheckinExpected called from the "command" runtime is blocked forever until the components comes online and kicks off the "check-in" sequence.

We can either:

  1. cache the latest "CheckinExpected" state until the component checks in over GRPs so we can send something back, or
  2. drop it on a floor, loosing some state that we suppose to synchronize on.
    Think would have to redo the comms to keep the last state and not block on that chan

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pushed another commit with the change in runtime_comms to addresses this particular issue according to the explanation above, implementing the caching of the latest state in the buffered channel of size 1.
d5f7de0

checkinObserved: make(chan *proto.CheckinObserved),
actionsConn: true,
actionsRequest: make(chan *proto.ActionRequest),
Expand Down