From 9c849e1d0ed858da272a61c22abf22e32db62de1 Mon Sep 17 00:00:00 2001 From: Wit Riewrangboonya Date: Mon, 16 Mar 2020 22:46:32 -0700 Subject: [PATCH] connection: Separate out lastActivity into recv and send activity `lastActivity` recorded when the last receive or send activity occurred over its connection. Separate this out into `lastActivityRecv` and `lastActivitySend` to observe the last receives or sends allowing monitoring of the elapsed time between reading from the recv buffer and writing to the send buffer. This will provide information on how stalled either buffer is. --- connection.go | 56 +++++++++++++++++++++++++++++++------------ connection_test.go | 42 ++++++++++++++++++++++---------- idle_sweep.go | 10 ++++++-- introspection.go | 60 ++++++++++++++++++++++++---------------------- 4 files changed, 109 insertions(+), 59 deletions(-) diff --git a/connection.go b/connection.go index 39ccb57d..961ed4ad 100644 --- a/connection.go +++ b/connection.go @@ -218,9 +218,10 @@ type Connection struct { healthCheckDone chan struct{} healthCheckHistory *healthHistory - // lastActivity is used to track how long the connection has been idle. - // (unix time, nano) - lastActivity atomic.Int64 + // lastActivity{Read,Write} is used to track how long the connection has been + // idle for the recieve and send connections respectively. (unix time, nano) + lastActivityRead atomic.Int64 + lastActivityWrite atomic.Int64 } type peerAddressComponents struct { @@ -322,6 +323,7 @@ func (ch *Channel) newConnection(conn net.Conn, initialID uint32, outboundHP str log = log.WithFields(LogField{"connectionDirection", connDirection}) peerInfo := ch.PeerInfo() + timeNow := ch.timeNow().UnixNano() c := &Connection{ channelConnectionCommon: ch.channelConnectionCommon, @@ -345,7 +347,8 @@ func (ch *Channel) newConnection(conn net.Conn, initialID uint32, outboundHP str events: events, commonStatsTags: ch.commonStatsTags, healthCheckHistory: newHealthHistory(), - lastActivity: *atomic.NewInt64(ch.timeNow().UnixNano()), + lastActivityRead: *atomic.NewInt64(timeNow), + lastActivityWrite: *atomic.NewInt64(timeNow), } if tosPriority := opts.TosPriority; tosPriority > 0 { @@ -666,7 +669,7 @@ func (c *Connection) readFrames(_ uint32) { return } - c.updateLastActivity(frame) + c.updateLastActivityRead(frame) var releaseFrame bool if c.relay == nil { @@ -736,7 +739,7 @@ func (c *Connection) writeFrames(_ uint32) { c.log.Debugf("Writing frame %s", f.Header) } - c.updateLastActivity(f) + c.updateLastActivityWrite(f) err := f.WriteOut(c.conn) c.opts.FramePool.Release(f) if err != nil { @@ -755,13 +758,19 @@ func (c *Connection) writeFrames(_ uint32) { } } -// updateLastActivity marks when the last message was received/sent on the channel. +// updateLastActivityRead marks when the last message was received on the channel. // This is used for monitoring idle connections and timing them out. -func (c *Connection) updateLastActivity(frame *Frame) { - // Pings are ignored for last activity. - switch frame.Header.messageType { - case messageTypeCallReq, messageTypeCallReqContinue, messageTypeCallRes, messageTypeCallResContinue, messageTypeError: - c.lastActivity.Store(c.timeNow().UnixNano()) +func (c *Connection) updateLastActivityRead(frame *Frame) { + if isMessageTypeCall(frame) { + c.lastActivityRead.Store(c.timeNow().UnixNano()) + } +} + +// updateLastActivityWrite marks when the last message was sent on the channel. +// This is used for monitoring idle connections and timing them out. +func (c *Connection) updateLastActivityWrite(frame *Frame) { + if isMessageTypeCall(frame) { + c.lastActivityWrite.Store(c.timeNow().UnixNano()) } } @@ -895,11 +904,18 @@ func (c *Connection) closeNetwork() { } } -// getLastActivityTime returns the timestamp of the last frame read or written, +// getLastActivityReadTime returns the timestamp of the last frame read, +// excluding pings. If no frames were transmitted yet, it will return the time +// this connection was created. +func (c *Connection) getLastActivityReadTime() time.Time { + return time.Unix(0, c.lastActivityRead.Load()) +} + +// getLastActivityWriteTime returns the timestamp of the last frame written, // excluding pings. If no frames were transmitted yet, it will return the time // this connection was created. -func (c *Connection) getLastActivityTime() time.Time { - return time.Unix(0, c.lastActivity.Load()) +func (c *Connection) getLastActivityWriteTime() time.Time { + return time.Unix(0, c.lastActivityWrite.Load()) } func (c *Connection) sendBufSize() (sendBufUsage int, sendBufSize int, _ error) { @@ -937,3 +953,13 @@ func getSysConn(conn net.Conn, log Logger) syscall.RawConn { return sysConn } + +func isMessageTypeCall(frame *Frame) bool { + // Pings are ignored for last activity. + switch frame.Header.messageType { + case messageTypeCallReq, messageTypeCallReqContinue, messageTypeCallRes, messageTypeCallResContinue, messageTypeError: + return true + } + + return false +} diff --git a/connection_test.go b/connection_test.go index 2501e644..a7f57e0b 100644 --- a/connection_test.go +++ b/connection_test.go @@ -1165,20 +1165,28 @@ func TestLastActivityTime(t *testing.T) { responseReceived := make(chan struct{}) // Helper function that checks the last activity time on client, server and relay. - validateLastActivityTime := func(expected time.Time) { + validateLastActivityTime := func(expectedReq time.Time, expectedResp time.Time) { clientConn := getConnection(t, client, outbound) serverConn := getConnection(t, server, inbound) - now := expected.UnixNano() + reqTime := expectedReq.UnixNano() + respTime := expectedResp.UnixNano() - assert.Equal(t, now, clientConn.LastActivity) - assert.Equal(t, now, serverConn.LastActivity) + assert.Equal(t, reqTime, clientConn.LastActivityWrite) + assert.Equal(t, reqTime, serverConn.LastActivityRead) + + assert.Equal(t, respTime, clientConn.LastActivityRead) + assert.Equal(t, respTime, serverConn.LastActivityWrite) // Relays should act like both clients and servers. if ts.HasRelay() { relayInbound := getConnection(t, ts.Relay(), inbound) relayOutbound := getConnection(t, ts.Relay(), outbound) - assert.Equal(t, now, relayInbound.LastActivity) - assert.Equal(t, now, relayOutbound.LastActivity) + + assert.Equal(t, reqTime, relayInbound.LastActivityRead) + assert.Equal(t, reqTime, relayOutbound.LastActivityWrite) + + assert.Equal(t, respTime, relayInbound.LastActivityWrite) + assert.Equal(t, respTime, relayOutbound.LastActivityRead) } } @@ -1191,6 +1199,7 @@ func TestLastActivityTime(t *testing.T) { clock.Elapse(1 * time.Second) }) + initTime := clock.Now() // Run the test twice, because the first call will also establish a connection. for i := 0; i < 2; i++ { beforeCallSent := clock.Now() @@ -1202,15 +1211,19 @@ func TestLastActivityTime(t *testing.T) { // Verify that the last activity time was updated before a response is received. <-callReceived - validateLastActivityTime(beforeCallSent) + validateLastActivityTime(beforeCallSent, initTime) // Let the server respond. blockResponse <- struct{}{} - // After a response was received, time should be +1s. Validate again that - // the last activity time was updated. + // After a response was received, time of the response should be +1s, + // without a change to the requet time. Validate again that the last + // activity time was updated. <-responseReceived - validateLastActivityTime(beforeCallSent.Add(1 * time.Second)) + validateLastActivityTime(beforeCallSent, beforeCallSent.Add(1*time.Second)) + + // Set the initTime as the time of the last response. + initTime = beforeCallSent.Add(1 * time.Second) // Elapse the clock for our next iteration. clock.Elapse(1 * time.Minute) @@ -1244,16 +1257,19 @@ func TestLastActivityTimePings(t *testing.T) { // Verify last activity time. clientConn := getConnection(t, client, outbound) - assert.Equal(t, timeAtStart, clientConn.LastActivity) + assert.Equal(t, timeAtStart, clientConn.LastActivityRead) + assert.Equal(t, timeAtStart, clientConn.LastActivityWrite) // Relays do not pass pings on to the server. if ts.HasRelay() { relayInbound := getConnection(t, ts.Relay(), inbound) - assert.Equal(t, timeAtStart, relayInbound.LastActivity) + assert.Equal(t, timeAtStart, relayInbound.LastActivityRead) + assert.Equal(t, timeAtStart, relayInbound.LastActivityWrite) } serverConn := getConnection(t, ts.Server(), inbound) - assert.Equal(t, timeAtStart, serverConn.LastActivity) + assert.Equal(t, timeAtStart, serverConn.LastActivityRead) + assert.Equal(t, timeAtStart, serverConn.LastActivityWrite) clock.Elapse(1 * time.Second) } diff --git a/idle_sweep.go b/idle_sweep.go index 94815e1b..115ecdfc 100644 --- a/idle_sweep.go +++ b/idle_sweep.go @@ -95,7 +95,12 @@ func (is *idleSweep) checkIdleConnections() { idleConnections := make([]*Connection, 0, 10) is.ch.mutable.RLock() for _, conn := range is.ch.mutable.conns { - if idleTime := now.Sub(conn.getLastActivityTime()); idleTime >= is.maxIdleTime { + lastActivityTime := conn.getLastActivityReadTime() + if sendActivityTime := conn.getLastActivityWriteTime(); lastActivityTime.Before(sendActivityTime) { + lastActivityTime = sendActivityTime + } + + if idleTime := now.Sub(lastActivityTime); idleTime >= is.maxIdleTime { idleConnections = append(idleConnections, conn) } } @@ -116,7 +121,8 @@ func (is *idleSweep) checkIdleConnections() { is.ch.log.WithFields( LogField{"remotePeer", conn.remotePeerInfo}, - LogField{"lastActivityTime", conn.getLastActivityTime()}, + LogField{"lastActivityTimeRead", conn.getLastActivityReadTime()}, + LogField{"lastActivityTimeWrite", conn.getLastActivityWriteTime()}, ).Info("Closing idle inbound connection.") conn.close(LogField{"reason", "Idle connection closed"}) } diff --git a/introspection.go b/introspection.go index dc434738..c80f37e6 100644 --- a/introspection.go +++ b/introspection.go @@ -145,21 +145,22 @@ type SubPeerScore struct { // ConnectionRuntimeState is the runtime state for a single connection. type ConnectionRuntimeState struct { - ID uint32 `json:"id"` - ConnectionState string `json:"connectionState"` - LocalHostPort string `json:"localHostPort"` - RemoteHostPort string `json:"remoteHostPort"` - OutboundHostPort string `json:"outboundHostPort"` - RemotePeer PeerInfo `json:"remotePeer"` - InboundExchange ExchangeSetRuntimeState `json:"inboundExchange"` - OutboundExchange ExchangeSetRuntimeState `json:"outboundExchange"` - Relayer RelayerRuntimeState `json:"relayer"` - HealthChecks []bool `json:"healthChecks,omitempty"` - LastActivity int64 `json:"lastActivity"` - SendChQueued int `json:"sendChQueued"` - SendChCapacity int `json:"sendChCapacity"` - SendBufferUsage int `json:"sendBufferUsage"` - SendBufferSize int `json:"sendBufferSize"` + ID uint32 `json:"id"` + ConnectionState string `json:"connectionState"` + LocalHostPort string `json:"localHostPort"` + RemoteHostPort string `json:"remoteHostPort"` + OutboundHostPort string `json:"outboundHostPort"` + RemotePeer PeerInfo `json:"remotePeer"` + InboundExchange ExchangeSetRuntimeState `json:"inboundExchange"` + OutboundExchange ExchangeSetRuntimeState `json:"outboundExchange"` + Relayer RelayerRuntimeState `json:"relayer"` + HealthChecks []bool `json:"healthChecks,omitempty"` + LastActivityRead int64 `json:"lastActivityRead"` + LastActivityWrite int64 `json:"lastActivityWrite"` + SendChQueued int `json:"sendChQueued"` + SendChCapacity int `json:"sendChCapacity"` + SendBufferUsage int `json:"sendBufferUsage"` + SendBufferSize int `json:"sendBufferSize"` } // RelayerRuntimeState is the runtime state for a single relayer. @@ -361,20 +362,21 @@ func (c *Connection) IntrospectState(opts *IntrospectionOptions) ConnectionRunti // TODO(prashantv): Add total number of health checks, and health check options. state := ConnectionRuntimeState{ - ID: c.connID, - ConnectionState: c.state.String(), - LocalHostPort: c.conn.LocalAddr().String(), - RemoteHostPort: c.conn.RemoteAddr().String(), - OutboundHostPort: c.outboundHP, - RemotePeer: c.remotePeerInfo, - InboundExchange: c.inbound.IntrospectState(opts), - OutboundExchange: c.outbound.IntrospectState(opts), - HealthChecks: c.healthCheckHistory.asBools(), - LastActivity: c.lastActivity.Load(), - SendChQueued: len(c.sendCh), - SendChCapacity: cap(c.sendCh), - SendBufferUsage: sendBufUsage, - SendBufferSize: sendBufSize, + ID: c.connID, + ConnectionState: c.state.String(), + LocalHostPort: c.conn.LocalAddr().String(), + RemoteHostPort: c.conn.RemoteAddr().String(), + OutboundHostPort: c.outboundHP, + RemotePeer: c.remotePeerInfo, + InboundExchange: c.inbound.IntrospectState(opts), + OutboundExchange: c.outbound.IntrospectState(opts), + HealthChecks: c.healthCheckHistory.asBools(), + LastActivityRead: c.lastActivityRead.Load(), + LastActivityWrite: c.lastActivityWrite.Load(), + SendChQueued: len(c.sendCh), + SendChCapacity: cap(c.sendCh), + SendBufferUsage: sendBufUsage, + SendBufferSize: sendBufSize, } if c.relay != nil { state.Relayer = c.relay.IntrospectState(opts)