diff --git a/connection.go b/connection.go index 39ccb57d..961ed4ad 100644 --- a/connection.go +++ b/connection.go @@ -218,9 +218,10 @@ type Connection struct { healthCheckDone chan struct{} healthCheckHistory *healthHistory - // lastActivity is used to track how long the connection has been idle. - // (unix time, nano) - lastActivity atomic.Int64 + // lastActivity{Read,Write} is used to track how long the connection has been + // idle for the recieve and send connections respectively. (unix time, nano) + lastActivityRead atomic.Int64 + lastActivityWrite atomic.Int64 } type peerAddressComponents struct { @@ -322,6 +323,7 @@ func (ch *Channel) newConnection(conn net.Conn, initialID uint32, outboundHP str log = log.WithFields(LogField{"connectionDirection", connDirection}) peerInfo := ch.PeerInfo() + timeNow := ch.timeNow().UnixNano() c := &Connection{ channelConnectionCommon: ch.channelConnectionCommon, @@ -345,7 +347,8 @@ func (ch *Channel) newConnection(conn net.Conn, initialID uint32, outboundHP str events: events, commonStatsTags: ch.commonStatsTags, healthCheckHistory: newHealthHistory(), - lastActivity: *atomic.NewInt64(ch.timeNow().UnixNano()), + lastActivityRead: *atomic.NewInt64(timeNow), + lastActivityWrite: *atomic.NewInt64(timeNow), } if tosPriority := opts.TosPriority; tosPriority > 0 { @@ -666,7 +669,7 @@ func (c *Connection) readFrames(_ uint32) { return } - c.updateLastActivity(frame) + c.updateLastActivityRead(frame) var releaseFrame bool if c.relay == nil { @@ -736,7 +739,7 @@ func (c *Connection) writeFrames(_ uint32) { c.log.Debugf("Writing frame %s", f.Header) } - c.updateLastActivity(f) + c.updateLastActivityWrite(f) err := f.WriteOut(c.conn) c.opts.FramePool.Release(f) if err != nil { @@ -755,13 +758,19 @@ func (c *Connection) writeFrames(_ uint32) { } } -// updateLastActivity marks when the last message was received/sent on the channel. +// updateLastActivityRead marks when the last message was received on the channel. // This is used for monitoring idle connections and timing them out. -func (c *Connection) updateLastActivity(frame *Frame) { - // Pings are ignored for last activity. - switch frame.Header.messageType { - case messageTypeCallReq, messageTypeCallReqContinue, messageTypeCallRes, messageTypeCallResContinue, messageTypeError: - c.lastActivity.Store(c.timeNow().UnixNano()) +func (c *Connection) updateLastActivityRead(frame *Frame) { + if isMessageTypeCall(frame) { + c.lastActivityRead.Store(c.timeNow().UnixNano()) + } +} + +// updateLastActivityWrite marks when the last message was sent on the channel. +// This is used for monitoring idle connections and timing them out. +func (c *Connection) updateLastActivityWrite(frame *Frame) { + if isMessageTypeCall(frame) { + c.lastActivityWrite.Store(c.timeNow().UnixNano()) } } @@ -895,11 +904,18 @@ func (c *Connection) closeNetwork() { } } -// getLastActivityTime returns the timestamp of the last frame read or written, +// getLastActivityReadTime returns the timestamp of the last frame read, +// excluding pings. If no frames were transmitted yet, it will return the time +// this connection was created. +func (c *Connection) getLastActivityReadTime() time.Time { + return time.Unix(0, c.lastActivityRead.Load()) +} + +// getLastActivityWriteTime returns the timestamp of the last frame written, // excluding pings. If no frames were transmitted yet, it will return the time // this connection was created. -func (c *Connection) getLastActivityTime() time.Time { - return time.Unix(0, c.lastActivity.Load()) +func (c *Connection) getLastActivityWriteTime() time.Time { + return time.Unix(0, c.lastActivityWrite.Load()) } func (c *Connection) sendBufSize() (sendBufUsage int, sendBufSize int, _ error) { @@ -937,3 +953,13 @@ func getSysConn(conn net.Conn, log Logger) syscall.RawConn { return sysConn } + +func isMessageTypeCall(frame *Frame) bool { + // Pings are ignored for last activity. + switch frame.Header.messageType { + case messageTypeCallReq, messageTypeCallReqContinue, messageTypeCallRes, messageTypeCallResContinue, messageTypeError: + return true + } + + return false +} diff --git a/connection_test.go b/connection_test.go index 2501e644..a7f57e0b 100644 --- a/connection_test.go +++ b/connection_test.go @@ -1165,20 +1165,28 @@ func TestLastActivityTime(t *testing.T) { responseReceived := make(chan struct{}) // Helper function that checks the last activity time on client, server and relay. - validateLastActivityTime := func(expected time.Time) { + validateLastActivityTime := func(expectedReq time.Time, expectedResp time.Time) { clientConn := getConnection(t, client, outbound) serverConn := getConnection(t, server, inbound) - now := expected.UnixNano() + reqTime := expectedReq.UnixNano() + respTime := expectedResp.UnixNano() - assert.Equal(t, now, clientConn.LastActivity) - assert.Equal(t, now, serverConn.LastActivity) + assert.Equal(t, reqTime, clientConn.LastActivityWrite) + assert.Equal(t, reqTime, serverConn.LastActivityRead) + + assert.Equal(t, respTime, clientConn.LastActivityRead) + assert.Equal(t, respTime, serverConn.LastActivityWrite) // Relays should act like both clients and servers. if ts.HasRelay() { relayInbound := getConnection(t, ts.Relay(), inbound) relayOutbound := getConnection(t, ts.Relay(), outbound) - assert.Equal(t, now, relayInbound.LastActivity) - assert.Equal(t, now, relayOutbound.LastActivity) + + assert.Equal(t, reqTime, relayInbound.LastActivityRead) + assert.Equal(t, reqTime, relayOutbound.LastActivityWrite) + + assert.Equal(t, respTime, relayInbound.LastActivityWrite) + assert.Equal(t, respTime, relayOutbound.LastActivityRead) } } @@ -1191,6 +1199,7 @@ func TestLastActivityTime(t *testing.T) { clock.Elapse(1 * time.Second) }) + initTime := clock.Now() // Run the test twice, because the first call will also establish a connection. for i := 0; i < 2; i++ { beforeCallSent := clock.Now() @@ -1202,15 +1211,19 @@ func TestLastActivityTime(t *testing.T) { // Verify that the last activity time was updated before a response is received. <-callReceived - validateLastActivityTime(beforeCallSent) + validateLastActivityTime(beforeCallSent, initTime) // Let the server respond. blockResponse <- struct{}{} - // After a response was received, time should be +1s. Validate again that - // the last activity time was updated. + // After a response was received, time of the response should be +1s, + // without a change to the requet time. Validate again that the last + // activity time was updated. <-responseReceived - validateLastActivityTime(beforeCallSent.Add(1 * time.Second)) + validateLastActivityTime(beforeCallSent, beforeCallSent.Add(1*time.Second)) + + // Set the initTime as the time of the last response. + initTime = beforeCallSent.Add(1 * time.Second) // Elapse the clock for our next iteration. clock.Elapse(1 * time.Minute) @@ -1244,16 +1257,19 @@ func TestLastActivityTimePings(t *testing.T) { // Verify last activity time. clientConn := getConnection(t, client, outbound) - assert.Equal(t, timeAtStart, clientConn.LastActivity) + assert.Equal(t, timeAtStart, clientConn.LastActivityRead) + assert.Equal(t, timeAtStart, clientConn.LastActivityWrite) // Relays do not pass pings on to the server. if ts.HasRelay() { relayInbound := getConnection(t, ts.Relay(), inbound) - assert.Equal(t, timeAtStart, relayInbound.LastActivity) + assert.Equal(t, timeAtStart, relayInbound.LastActivityRead) + assert.Equal(t, timeAtStart, relayInbound.LastActivityWrite) } serverConn := getConnection(t, ts.Server(), inbound) - assert.Equal(t, timeAtStart, serverConn.LastActivity) + assert.Equal(t, timeAtStart, serverConn.LastActivityRead) + assert.Equal(t, timeAtStart, serverConn.LastActivityWrite) clock.Elapse(1 * time.Second) } diff --git a/idle_sweep.go b/idle_sweep.go index 94815e1b..115ecdfc 100644 --- a/idle_sweep.go +++ b/idle_sweep.go @@ -95,7 +95,12 @@ func (is *idleSweep) checkIdleConnections() { idleConnections := make([]*Connection, 0, 10) is.ch.mutable.RLock() for _, conn := range is.ch.mutable.conns { - if idleTime := now.Sub(conn.getLastActivityTime()); idleTime >= is.maxIdleTime { + lastActivityTime := conn.getLastActivityReadTime() + if sendActivityTime := conn.getLastActivityWriteTime(); lastActivityTime.Before(sendActivityTime) { + lastActivityTime = sendActivityTime + } + + if idleTime := now.Sub(lastActivityTime); idleTime >= is.maxIdleTime { idleConnections = append(idleConnections, conn) } } @@ -116,7 +121,8 @@ func (is *idleSweep) checkIdleConnections() { is.ch.log.WithFields( LogField{"remotePeer", conn.remotePeerInfo}, - LogField{"lastActivityTime", conn.getLastActivityTime()}, + LogField{"lastActivityTimeRead", conn.getLastActivityReadTime()}, + LogField{"lastActivityTimeWrite", conn.getLastActivityWriteTime()}, ).Info("Closing idle inbound connection.") conn.close(LogField{"reason", "Idle connection closed"}) } diff --git a/introspection.go b/introspection.go index dc434738..c80f37e6 100644 --- a/introspection.go +++ b/introspection.go @@ -145,21 +145,22 @@ type SubPeerScore struct { // ConnectionRuntimeState is the runtime state for a single connection. type ConnectionRuntimeState struct { - ID uint32 `json:"id"` - ConnectionState string `json:"connectionState"` - LocalHostPort string `json:"localHostPort"` - RemoteHostPort string `json:"remoteHostPort"` - OutboundHostPort string `json:"outboundHostPort"` - RemotePeer PeerInfo `json:"remotePeer"` - InboundExchange ExchangeSetRuntimeState `json:"inboundExchange"` - OutboundExchange ExchangeSetRuntimeState `json:"outboundExchange"` - Relayer RelayerRuntimeState `json:"relayer"` - HealthChecks []bool `json:"healthChecks,omitempty"` - LastActivity int64 `json:"lastActivity"` - SendChQueued int `json:"sendChQueued"` - SendChCapacity int `json:"sendChCapacity"` - SendBufferUsage int `json:"sendBufferUsage"` - SendBufferSize int `json:"sendBufferSize"` + ID uint32 `json:"id"` + ConnectionState string `json:"connectionState"` + LocalHostPort string `json:"localHostPort"` + RemoteHostPort string `json:"remoteHostPort"` + OutboundHostPort string `json:"outboundHostPort"` + RemotePeer PeerInfo `json:"remotePeer"` + InboundExchange ExchangeSetRuntimeState `json:"inboundExchange"` + OutboundExchange ExchangeSetRuntimeState `json:"outboundExchange"` + Relayer RelayerRuntimeState `json:"relayer"` + HealthChecks []bool `json:"healthChecks,omitempty"` + LastActivityRead int64 `json:"lastActivityRead"` + LastActivityWrite int64 `json:"lastActivityWrite"` + SendChQueued int `json:"sendChQueued"` + SendChCapacity int `json:"sendChCapacity"` + SendBufferUsage int `json:"sendBufferUsage"` + SendBufferSize int `json:"sendBufferSize"` } // RelayerRuntimeState is the runtime state for a single relayer. @@ -361,20 +362,21 @@ func (c *Connection) IntrospectState(opts *IntrospectionOptions) ConnectionRunti // TODO(prashantv): Add total number of health checks, and health check options. state := ConnectionRuntimeState{ - ID: c.connID, - ConnectionState: c.state.String(), - LocalHostPort: c.conn.LocalAddr().String(), - RemoteHostPort: c.conn.RemoteAddr().String(), - OutboundHostPort: c.outboundHP, - RemotePeer: c.remotePeerInfo, - InboundExchange: c.inbound.IntrospectState(opts), - OutboundExchange: c.outbound.IntrospectState(opts), - HealthChecks: c.healthCheckHistory.asBools(), - LastActivity: c.lastActivity.Load(), - SendChQueued: len(c.sendCh), - SendChCapacity: cap(c.sendCh), - SendBufferUsage: sendBufUsage, - SendBufferSize: sendBufSize, + ID: c.connID, + ConnectionState: c.state.String(), + LocalHostPort: c.conn.LocalAddr().String(), + RemoteHostPort: c.conn.RemoteAddr().String(), + OutboundHostPort: c.outboundHP, + RemotePeer: c.remotePeerInfo, + InboundExchange: c.inbound.IntrospectState(opts), + OutboundExchange: c.outbound.IntrospectState(opts), + HealthChecks: c.healthCheckHistory.asBools(), + LastActivityRead: c.lastActivityRead.Load(), + LastActivityWrite: c.lastActivityWrite.Load(), + SendChQueued: len(c.sendCh), + SendChCapacity: cap(c.sendCh), + SendBufferUsage: sendBufUsage, + SendBufferSize: sendBufSize, } if c.relay != nil { state.Relayer = c.relay.IntrospectState(opts)