-
Notifications
You must be signed in to change notification settings - Fork 103
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add timeout to rungroup shutdown #1481
Changes from 2 commits
043918a
2a1fbee
e9d6928
83bada1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,10 +6,13 @@ package rungroup | |
// timeout. See: https://github.com/kolide/launcher/issues/1205 | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"time" | ||
|
||
"github.com/go-kit/kit/log" | ||
"github.com/go-kit/kit/log/level" | ||
"golang.org/x/sync/semaphore" | ||
) | ||
|
||
type ( | ||
|
@@ -30,6 +33,11 @@ type ( | |
} | ||
) | ||
|
||
const ( | ||
interruptTimeout = 5 * time.Second // How long for all actors to return from their `interrupt` function | ||
executeReturnTimeout = 5 * time.Second // After interrupted, how long for all actors to exit their `execute` functions | ||
) | ||
|
||
func NewRunGroup(logger log.Logger) *Group { | ||
return &Group{ | ||
logger: log.With(logger, "component", "run_group"), | ||
|
@@ -65,15 +73,39 @@ func (g *Group) Run() error { | |
level.Debug(g.logger).Log("msg", "received interrupt error from first actor -- shutting down other actors", "err", initialActorErr) | ||
|
||
// Signal all actors to stop. | ||
numActors := int64(len(g.actors)) | ||
interruptWait := semaphore.NewWeighted(numActors) | ||
for _, a := range g.actors { | ||
level.Debug(g.logger).Log("msg", "interrupting actor", "actor", a.name) | ||
a.interrupt(initialActorErr.err) | ||
interruptWait.Acquire(context.Background(), 1) | ||
go func(a rungroupActor) { | ||
defer interruptWait.Release(1) | ||
level.Debug(g.logger).Log("msg", "interrupting actor", "actor", a.name) | ||
a.interrupt(initialActorErr.err) | ||
level.Debug(g.logger).Log("msg", "interrupt complete", "actor", a.name) | ||
}(a) | ||
} | ||
|
||
// Wait for all actors to stop. | ||
interruptCtx, interruptCancel := context.WithTimeout(context.Background(), interruptTimeout) | ||
defer interruptCancel() | ||
|
||
// Wait for interrupts to complete, but only until we hit our interruptCtx timeout | ||
if err := interruptWait.Acquire(interruptCtx, numActors); err != nil { | ||
level.Debug(g.logger).Log("msg", "timeout waiting for interrupts to complete, proceeding with shutdown", "err", err) | ||
} | ||
|
||
// Wait for all other actors to stop, but only until we hit our executeReturnTimeout | ||
timeoutTimer := time.NewTimer(executeReturnTimeout) | ||
defer timeoutTimer.Stop() | ||
for i := 1; i < cap(errors); i++ { | ||
e := <-errors | ||
level.Debug(g.logger).Log("msg", "successfully interrupted actor", "actor", e.errorSourceName, "index", i) | ||
select { | ||
case <-timeoutTimer.C: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this case potentially leaks goroutines, which might leave a zombie process? Might be better than hanging, might need a bigger hammer There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It will in the case of an autoupdate where we're reloading launcher, yeah -- that seemed preferable to hanging. The bigger hammer I can think of is just os.Exit and let launchctl/systemctl/service manager restart launcher, but I know we've been hesitant to do that in the past. |
||
level.Debug(g.logger).Log("msg", "rungroup shutdown deadline exceeded, not waiting for any more actors to return") | ||
|
||
// Return the original error so we can proceed with shutdown | ||
return initialActorErr.err | ||
case e := <-errors: | ||
level.Debug(g.logger).Log("msg", "execute returned", "actor", e.errorSourceName, "index", i) | ||
} | ||
} | ||
|
||
// Return the original error. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
semaphore.NewWeighted
is interesting as compared to a waitgroup. I think it's good, though a little harder to think about all the edges.Maybe cleaner than the channels I used for osquery/osquery-go#108, less sure about perf. But that's not a real concern here.