Skip to content

Commit

Permalink
connection: Separate out lastActivity into recv and send activity
Browse files Browse the repository at this point in the history
`lastActivity` recorded when the last receive or send activity occurred over
its connection.  Separate this out into `lastActivityRecv` and
`lastActivitySend` to observe the last receives or sends allowing monitoring of
the elapsed time between reading from the recv buffer and writing to the send
buffer.   This will provide information on how stalled either buffer is.
  • Loading branch information
witriew committed Mar 27, 2020
1 parent 41382bf commit 9c849e1
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 59 deletions.
56 changes: 41 additions & 15 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,9 +218,10 @@ type Connection struct {
healthCheckDone chan struct{}
healthCheckHistory *healthHistory

// lastActivity is used to track how long the connection has been idle.
// (unix time, nano)
lastActivity atomic.Int64
// lastActivity{Read,Write} is used to track how long the connection has been
// idle for the recieve and send connections respectively. (unix time, nano)
lastActivityRead atomic.Int64
lastActivityWrite atomic.Int64
}

type peerAddressComponents struct {
Expand Down Expand Up @@ -322,6 +323,7 @@ func (ch *Channel) newConnection(conn net.Conn, initialID uint32, outboundHP str

log = log.WithFields(LogField{"connectionDirection", connDirection})
peerInfo := ch.PeerInfo()
timeNow := ch.timeNow().UnixNano()

c := &Connection{
channelConnectionCommon: ch.channelConnectionCommon,
Expand All @@ -345,7 +347,8 @@ func (ch *Channel) newConnection(conn net.Conn, initialID uint32, outboundHP str
events: events,
commonStatsTags: ch.commonStatsTags,
healthCheckHistory: newHealthHistory(),
lastActivity: *atomic.NewInt64(ch.timeNow().UnixNano()),
lastActivityRead: *atomic.NewInt64(timeNow),
lastActivityWrite: *atomic.NewInt64(timeNow),
}

if tosPriority := opts.TosPriority; tosPriority > 0 {
Expand Down Expand Up @@ -666,7 +669,7 @@ func (c *Connection) readFrames(_ uint32) {
return
}

c.updateLastActivity(frame)
c.updateLastActivityRead(frame)

var releaseFrame bool
if c.relay == nil {
Expand Down Expand Up @@ -736,7 +739,7 @@ func (c *Connection) writeFrames(_ uint32) {
c.log.Debugf("Writing frame %s", f.Header)
}

c.updateLastActivity(f)
c.updateLastActivityWrite(f)
err := f.WriteOut(c.conn)
c.opts.FramePool.Release(f)
if err != nil {
Expand All @@ -755,13 +758,19 @@ func (c *Connection) writeFrames(_ uint32) {
}
}

// updateLastActivity marks when the last message was received/sent on the channel.
// updateLastActivityRead marks when the last message was received on the channel.
// This is used for monitoring idle connections and timing them out.
func (c *Connection) updateLastActivity(frame *Frame) {
// Pings are ignored for last activity.
switch frame.Header.messageType {
case messageTypeCallReq, messageTypeCallReqContinue, messageTypeCallRes, messageTypeCallResContinue, messageTypeError:
c.lastActivity.Store(c.timeNow().UnixNano())
func (c *Connection) updateLastActivityRead(frame *Frame) {
if isMessageTypeCall(frame) {
c.lastActivityRead.Store(c.timeNow().UnixNano())
}
}

// updateLastActivityWrite marks when the last message was sent on the channel.
// This is used for monitoring idle connections and timing them out.
func (c *Connection) updateLastActivityWrite(frame *Frame) {
if isMessageTypeCall(frame) {
c.lastActivityWrite.Store(c.timeNow().UnixNano())
}
}

Expand Down Expand Up @@ -895,11 +904,18 @@ func (c *Connection) closeNetwork() {
}
}

// getLastActivityTime returns the timestamp of the last frame read or written,
// getLastActivityReadTime returns the timestamp of the last frame read,
// excluding pings. If no frames were transmitted yet, it will return the time
// this connection was created.
func (c *Connection) getLastActivityReadTime() time.Time {
return time.Unix(0, c.lastActivityRead.Load())
}

// getLastActivityWriteTime returns the timestamp of the last frame written,
// excluding pings. If no frames were transmitted yet, it will return the time
// this connection was created.
func (c *Connection) getLastActivityTime() time.Time {
return time.Unix(0, c.lastActivity.Load())
func (c *Connection) getLastActivityWriteTime() time.Time {
return time.Unix(0, c.lastActivityWrite.Load())
}

func (c *Connection) sendBufSize() (sendBufUsage int, sendBufSize int, _ error) {
Expand Down Expand Up @@ -937,3 +953,13 @@ func getSysConn(conn net.Conn, log Logger) syscall.RawConn {

return sysConn
}

func isMessageTypeCall(frame *Frame) bool {
// Pings are ignored for last activity.
switch frame.Header.messageType {
case messageTypeCallReq, messageTypeCallReqContinue, messageTypeCallRes, messageTypeCallResContinue, messageTypeError:
return true
}

return false
}
42 changes: 29 additions & 13 deletions connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1165,20 +1165,28 @@ func TestLastActivityTime(t *testing.T) {
responseReceived := make(chan struct{})

// Helper function that checks the last activity time on client, server and relay.
validateLastActivityTime := func(expected time.Time) {
validateLastActivityTime := func(expectedReq time.Time, expectedResp time.Time) {
clientConn := getConnection(t, client, outbound)
serverConn := getConnection(t, server, inbound)
now := expected.UnixNano()
reqTime := expectedReq.UnixNano()
respTime := expectedResp.UnixNano()

assert.Equal(t, now, clientConn.LastActivity)
assert.Equal(t, now, serverConn.LastActivity)
assert.Equal(t, reqTime, clientConn.LastActivityWrite)
assert.Equal(t, reqTime, serverConn.LastActivityRead)

assert.Equal(t, respTime, clientConn.LastActivityRead)
assert.Equal(t, respTime, serverConn.LastActivityWrite)

// Relays should act like both clients and servers.
if ts.HasRelay() {
relayInbound := getConnection(t, ts.Relay(), inbound)
relayOutbound := getConnection(t, ts.Relay(), outbound)
assert.Equal(t, now, relayInbound.LastActivity)
assert.Equal(t, now, relayOutbound.LastActivity)

assert.Equal(t, reqTime, relayInbound.LastActivityRead)
assert.Equal(t, reqTime, relayOutbound.LastActivityWrite)

assert.Equal(t, respTime, relayInbound.LastActivityWrite)
assert.Equal(t, respTime, relayOutbound.LastActivityRead)
}
}

Expand All @@ -1191,6 +1199,7 @@ func TestLastActivityTime(t *testing.T) {
clock.Elapse(1 * time.Second)
})

initTime := clock.Now()
// Run the test twice, because the first call will also establish a connection.
for i := 0; i < 2; i++ {
beforeCallSent := clock.Now()
Expand All @@ -1202,15 +1211,19 @@ func TestLastActivityTime(t *testing.T) {

// Verify that the last activity time was updated before a response is received.
<-callReceived
validateLastActivityTime(beforeCallSent)
validateLastActivityTime(beforeCallSent, initTime)

// Let the server respond.
blockResponse <- struct{}{}

// After a response was received, time should be +1s. Validate again that
// the last activity time was updated.
// After a response was received, time of the response should be +1s,
// without a change to the requet time. Validate again that the last
// activity time was updated.
<-responseReceived
validateLastActivityTime(beforeCallSent.Add(1 * time.Second))
validateLastActivityTime(beforeCallSent, beforeCallSent.Add(1*time.Second))

// Set the initTime as the time of the last response.
initTime = beforeCallSent.Add(1 * time.Second)

// Elapse the clock for our next iteration.
clock.Elapse(1 * time.Minute)
Expand Down Expand Up @@ -1244,16 +1257,19 @@ func TestLastActivityTimePings(t *testing.T) {

// Verify last activity time.
clientConn := getConnection(t, client, outbound)
assert.Equal(t, timeAtStart, clientConn.LastActivity)
assert.Equal(t, timeAtStart, clientConn.LastActivityRead)
assert.Equal(t, timeAtStart, clientConn.LastActivityWrite)

// Relays do not pass pings on to the server.
if ts.HasRelay() {
relayInbound := getConnection(t, ts.Relay(), inbound)
assert.Equal(t, timeAtStart, relayInbound.LastActivity)
assert.Equal(t, timeAtStart, relayInbound.LastActivityRead)
assert.Equal(t, timeAtStart, relayInbound.LastActivityWrite)
}

serverConn := getConnection(t, ts.Server(), inbound)
assert.Equal(t, timeAtStart, serverConn.LastActivity)
assert.Equal(t, timeAtStart, serverConn.LastActivityRead)
assert.Equal(t, timeAtStart, serverConn.LastActivityWrite)

clock.Elapse(1 * time.Second)
}
Expand Down
10 changes: 8 additions & 2 deletions idle_sweep.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,12 @@ func (is *idleSweep) checkIdleConnections() {
idleConnections := make([]*Connection, 0, 10)
is.ch.mutable.RLock()
for _, conn := range is.ch.mutable.conns {
if idleTime := now.Sub(conn.getLastActivityTime()); idleTime >= is.maxIdleTime {
lastActivityTime := conn.getLastActivityReadTime()
if sendActivityTime := conn.getLastActivityWriteTime(); lastActivityTime.Before(sendActivityTime) {
lastActivityTime = sendActivityTime
}

if idleTime := now.Sub(lastActivityTime); idleTime >= is.maxIdleTime {
idleConnections = append(idleConnections, conn)
}
}
Expand All @@ -116,7 +121,8 @@ func (is *idleSweep) checkIdleConnections() {

is.ch.log.WithFields(
LogField{"remotePeer", conn.remotePeerInfo},
LogField{"lastActivityTime", conn.getLastActivityTime()},
LogField{"lastActivityTimeRead", conn.getLastActivityReadTime()},
LogField{"lastActivityTimeWrite", conn.getLastActivityWriteTime()},
).Info("Closing idle inbound connection.")
conn.close(LogField{"reason", "Idle connection closed"})
}
Expand Down
60 changes: 31 additions & 29 deletions introspection.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,21 +145,22 @@ type SubPeerScore struct {

// ConnectionRuntimeState is the runtime state for a single connection.
type ConnectionRuntimeState struct {
ID uint32 `json:"id"`
ConnectionState string `json:"connectionState"`
LocalHostPort string `json:"localHostPort"`
RemoteHostPort string `json:"remoteHostPort"`
OutboundHostPort string `json:"outboundHostPort"`
RemotePeer PeerInfo `json:"remotePeer"`
InboundExchange ExchangeSetRuntimeState `json:"inboundExchange"`
OutboundExchange ExchangeSetRuntimeState `json:"outboundExchange"`
Relayer RelayerRuntimeState `json:"relayer"`
HealthChecks []bool `json:"healthChecks,omitempty"`
LastActivity int64 `json:"lastActivity"`
SendChQueued int `json:"sendChQueued"`
SendChCapacity int `json:"sendChCapacity"`
SendBufferUsage int `json:"sendBufferUsage"`
SendBufferSize int `json:"sendBufferSize"`
ID uint32 `json:"id"`
ConnectionState string `json:"connectionState"`
LocalHostPort string `json:"localHostPort"`
RemoteHostPort string `json:"remoteHostPort"`
OutboundHostPort string `json:"outboundHostPort"`
RemotePeer PeerInfo `json:"remotePeer"`
InboundExchange ExchangeSetRuntimeState `json:"inboundExchange"`
OutboundExchange ExchangeSetRuntimeState `json:"outboundExchange"`
Relayer RelayerRuntimeState `json:"relayer"`
HealthChecks []bool `json:"healthChecks,omitempty"`
LastActivityRead int64 `json:"lastActivityRead"`
LastActivityWrite int64 `json:"lastActivityWrite"`
SendChQueued int `json:"sendChQueued"`
SendChCapacity int `json:"sendChCapacity"`
SendBufferUsage int `json:"sendBufferUsage"`
SendBufferSize int `json:"sendBufferSize"`
}

// RelayerRuntimeState is the runtime state for a single relayer.
Expand Down Expand Up @@ -361,20 +362,21 @@ func (c *Connection) IntrospectState(opts *IntrospectionOptions) ConnectionRunti

// TODO(prashantv): Add total number of health checks, and health check options.
state := ConnectionRuntimeState{
ID: c.connID,
ConnectionState: c.state.String(),
LocalHostPort: c.conn.LocalAddr().String(),
RemoteHostPort: c.conn.RemoteAddr().String(),
OutboundHostPort: c.outboundHP,
RemotePeer: c.remotePeerInfo,
InboundExchange: c.inbound.IntrospectState(opts),
OutboundExchange: c.outbound.IntrospectState(opts),
HealthChecks: c.healthCheckHistory.asBools(),
LastActivity: c.lastActivity.Load(),
SendChQueued: len(c.sendCh),
SendChCapacity: cap(c.sendCh),
SendBufferUsage: sendBufUsage,
SendBufferSize: sendBufSize,
ID: c.connID,
ConnectionState: c.state.String(),
LocalHostPort: c.conn.LocalAddr().String(),
RemoteHostPort: c.conn.RemoteAddr().String(),
OutboundHostPort: c.outboundHP,
RemotePeer: c.remotePeerInfo,
InboundExchange: c.inbound.IntrospectState(opts),
OutboundExchange: c.outbound.IntrospectState(opts),
HealthChecks: c.healthCheckHistory.asBools(),
LastActivityRead: c.lastActivityRead.Load(),
LastActivityWrite: c.lastActivityWrite.Load(),
SendChQueued: len(c.sendCh),
SendChCapacity: cap(c.sendCh),
SendBufferUsage: sendBufUsage,
SendBufferSize: sendBufSize,
}
if c.relay != nil {
state.Relayer = c.relay.IntrospectState(opts)
Expand Down

0 comments on commit 9c849e1

Please sign in to comment.