-
Notifications
You must be signed in to change notification settings - Fork 83
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Automatically close connections after an idle timeout #671
Conversation
Thanks for taking this on @orenkislev! Couple of requests:
Currently, there's a big race condition in |
@prashantv PTAL |
7e76973
to
adf77dc
Compare
The FakeTicker is useful for other tests (see #671), so move it out of the health tests into testutils.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, some high level comments:
- Let's keep it simple/obvious and have it
Start
/Stop
from one location unless there's a big benefit - Try to avoid sleeping in tests. We've fought pretty hard to make the tchannel tests not flaky, and sleeps end up destabilizing these tests.
idle_sweep.go
Outdated
// them from the peer list. | ||
// NOTE: This struct is not thread-safe on its own. Calls to Start() and Stop() | ||
// should be guarded by locking ch.mutable | ||
type IdleSweep struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we have to export this type
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
idle_sweep.go
Outdated
const ( | ||
// defaultMaxIdleTime is the duration a connection is allowed to remain idle | ||
// before it is automatically closed. | ||
defaultMaxIdleTime = 3 * time.Minute |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need a default? can we just force the user to specify a time if they specify a non-zero idle check interval?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
idle_sweep.go
Outdated
|
||
// defaultIdleCheckInterval is the frequency in which the channel checks for | ||
// idle connections. (Default: check is disabled) | ||
defaultIdleCheckInterval = time.Duration(0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if the default is 0, is there much advantage in having this constant?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
idle_sweep.go
Outdated
|
||
// newIdleSweep starts a poller that checks for idle connections at given | ||
// intervals. | ||
func newIdleSweep(ch *Channel, opts *ChannelOptions) *IdleSweep { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if the idle check interval is nil, should we skip creating this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I find it better to have an object even if it doesn't do anything, than to have to check for nil everywhere. Makes the code cleaner, and less prone to bugs. I don't feel too strongly about it, if you prefer nil checks.
ch.mutable.Lock() | ||
|
||
if ch.mutable.l != nil { | ||
ch.mutable.l.Close() | ||
} | ||
|
||
// Stop the idle connections timer. | ||
ch.mutable.idleSweep.Stop() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
close could be called multiple times, is it OK to call Stop
multiple times?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IdleSweep.started
will be false, so there won't be any side effects of calling Stop()
multiple times.
idle_sweep_test.go
Outdated
for _, peer := range rootPeers { | ||
in, out := peer.NumConnections() | ||
|
||
ch.Logger().Infof("numConnections IN: %d OUT: %d", in, out) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need these logs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
idle_sweep_test.go
Outdated
|
||
serverOpts := testutils.NewOpts(). | ||
SetTimeNow(clock.Now). | ||
SetRelayOptionsFn(func(relayOpts *testutils.ChannelOpts) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need the relay options fn? can we just create a new server which doesn't have tehse options by using ts.NewServer
, and ignore ts.Server
?
the testutils
channel logic is pretty complicated, so I'd like to avoid extra logic there if possible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
idle_sweep_test.go
Outdated
// Both server and client now have an active connection. After 3 minutes they | ||
// should be cleared out by the idle sweep. | ||
clientTicker.Tick() | ||
time.Sleep(testutils.Timeout(10 * time.Millisecond)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need sleeps here? if anything we should have a testutils.WaitFor
, but I'm strongly against sleeps in tests, they end up being flaky or cause the tests to slow down
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, this was a tough one. The call to checkIdleConnections
happens in a goroutine and we have no synchronization channel to know when it completed. There is additionally some small delay between disconnecting the client-side until the server-side receives and processes the disconnect, even though they are both happening in localhost.
I changed to use testutils.WaitFor
but I'm open to other suggestions.
testutils/ticker.go
Outdated
|
||
package testutils | ||
|
||
import ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can simplify to import "time"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done (in your branch)
health_ext_test.go
Outdated
@@ -75,7 +46,7 @@ func TestHealthCheckStopBeforeStart(t *testing.T) { | |||
}) | |||
defer cancel() | |||
|
|||
ft := newFakeTicker() | |||
ft := testutils.NewFakeTicker() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've created #674 to move this out separately to reduce noise in this diff (and to add some simple tests to it).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you! I will merge and rebase
adf77dc
to
a3dcce2
Compare
idle_sweep.go
Outdated
return | ||
} | ||
|
||
if is.maxIdleTime <= time.Duration(0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this sort of validation should happen when creating the channel and return an error.
also, no need to type the constant here, you can do if is.maxIdleTime <= 0
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's also add test coverage for this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
channel.go
Outdated
@@ -299,6 +313,9 @@ func (ch *Channel) Serve(l net.Listener) error { | |||
mutable.peerInfo.IsEphemeral = false | |||
ch.log = ch.log.WithFields(LogField{"hostPort", mutable.peerInfo.HostPort}) | |||
|
|||
// Start the idle connection timer. | |||
ch.mutable.idleSweep.Start() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm I don't think we allow channels to be reused. Serve
ensures the channel state is ChannelClient
(which is the initial state), and we don't allow going backwards anywhere.
idle_sweep.go
Outdated
return | ||
} | ||
|
||
is.ch.log.Info("Stopping idle connections poller.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no need for 2 logs here, just "Stopping" is enough, since nothing below blocks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
idle_sweep.go
Outdated
|
||
for _, conn := range is.ch.mutable.conns { | ||
idleTime := now.Sub(conn.getLastActivityTime()) | ||
if idleTime >= is.maxIdleTime { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
optional: can scope the idleTime
just to the if,
if idleTime := now.Sub(..); idleTime >= is.maxIdleTime {
[...]
}
in general I prefer to scope variables to the minimum scope where they're needed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
idle_sweep.go
Outdated
} | ||
} | ||
|
||
is.ch.mutable.RUnlock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you move the RLock
and RUnlock
right next to the if, we can treat that as a single block.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
idle_sweep.go
Outdated
|
||
for _, conn := range idleConnections { | ||
is.ch.log.WithFields( | ||
LogField{"remotePeer", conn.remotePeerInfo}).Info("Closing idle inbound connection.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we log the last activity time and the idle time here. if we're calculating them again, we might want to double check that it really is idle just in case of a race.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
idle_sweep.go
Outdated
for _, conn := range idleConnections { | ||
is.ch.log.WithFields( | ||
LogField{"remotePeer", conn.remotePeerInfo}).Info("Closing idle inbound connection.") | ||
conn.close(LogField{"reason", "Idle connection closed"}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think when we close a connection, it doesn't immediately get removed from ch.mutable.conns, so we may end up closing multiple times. that should be safe, but just wanted to call that out.
we could instead only close connections that are active here. (suggest here to avoid holding the RLock
for too long, and the filter of idle but not active is very unlikely, so probably won't get a benefit from doing it earlier)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
channel.go
Outdated
@@ -218,6 +229,12 @@ func NewChannel(serviceName string, opts *ChannelOptions) (*Channel, error) { | |||
LogField{"chID", chID}, | |||
) | |||
|
|||
if opts.IdleCheckInterval > 0 && opts.MaxIdleTime <= 0 { | |||
logger.Warn("To enable automatically removing idle connections, you must " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should return an error, no point allowing users to create a channel with invalid config.
Suggest making an unexported function on ChannelOptions
that does this,
func (o *ChannelOpts) validateIdleCheck() error {
...
}
that way we can just return an error from validation:
if err := opts.validateIdleCheck(); err != nil {
return nil, err
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
channel.go
Outdated
|
||
// Start the idle connection timer. | ||
ch.mutable.idleSweep = newIdleSweep(ch, opts) | ||
ch.mutable.idleSweep.Start() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no point separating the new and start, we can simplify by just having startIdleSweep
and then we don't have the double-start problem
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
|
||
// Stop kills the poller checking for idle connections. | ||
func (is *idleSweep) Stop() { | ||
if !is.started { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can combine started
with the stopCh
. on start, set stopCh
, on Stop
, if stopCh == nil
, there's nothing running. once we close(is.stopCh)
, we can set it to nil
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
EDIT: Your suggestion causes a race condition. I went back to the original implementation with started
.
Either you set stopCh to nil after closing it, in which case you have the goroutine trying to select
on a nil variable. Or you can set it to nil from the goroutine, but then Stop() might return without stopCh being nil, which might cause a bug if Stop gets called twice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ahh, fair enough, let's stick to this implementation. thanks
idle_sweep.go
Outdated
|
||
// Acquire the read lock and examine which connections are idle. | ||
idleConnections := make([]*Connection, 0, 10) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's remove this blank line as well, so the comment is with the block.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
idle_sweep.go
Outdated
|
||
is.ch.log.WithFields( | ||
LogField{"remotePeer", conn.remotePeerInfo}, | ||
LogField{"lastActivityTime", conn.getLastActivityTime()}).Info("Closing idle inbound connection.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: move the Info(..)
to the next line so fields are clearly separated from the message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
idle_sweep_test.go
Outdated
return strings.Join(status, ", ") | ||
} | ||
|
||
func waitForZeroConnections(t *testing.T, channels ...*Channel) bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we could actually events here:
// OnPeerStatusChanged is an optional callback that receives a notification
// whenever the channel establishes a usable connection to a peer, or loses
// a connection to a peer.
OnPeerStatusChanged func(*Peer)
consider setting up events so we know when there's a connection change rather than waiting in a loop
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good! I fixed some minor nits (whitespace, using testutils.Timeout
).
Once tests pass, I'll squash the commits and update the description to match the PR and land.
Really appreciate the contribution @orenkislev, thank you!
Fixes #616
If IdleCheckInterval is set (default: disabled), each channel will run a goroutine that periodically checks the last activity time on each connection, and disconnects inbound/outbound connections on which the last activity time exceeds MaxIdleTime (default: 3min).
Note that pings do not count as activity.