Skip to content

Commit

Permalink
ETCM-463: Count requests as well.
Browse files Browse the repository at this point in the history
  • Loading branch information
aakoshh committed Dec 11, 2020
1 parent b4d8fd7 commit 8876c38
Showing 1 changed file with 30 additions and 15 deletions.
45 changes: 30 additions & 15 deletions src/main/scala/io/iohk/ethereum/network/PeerStatisticsActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,20 @@ class PeerStatisticsActor(

// Subscribe to messages received from handshaked peers to maintain stats.
peerEventBus ! Subscribe(MessageSubscriptionClassifier)
// Removing peers is an optimisation to free space, but eventualy the stats would be overwritten anyway.
// Removing peers is an optimisation to free space, but eventually the stats would be overwritten anyway.
peerEventBus ! Subscribe(SubscriptionClassifier.PeerDisconnectedClassifier(PeerSelector.AllPeers))

def receive: Receive = handlePeerEvents orElse handleStatsRequests

private def handlePeerEvents: Receive = {
case PeerEvent.MessageFromPeer(_, peerId) =>
case PeerEvent.MessageFromPeer(msg, peerId) =>
val now = System.currentTimeMillis()
val obs = Stat(responsesReceived = 1, firstSeenTimeMillis = Some(now), lastSeenTimeMillis = Some(now))
val obs = Stat(
responsesReceived = if (ResponseCodes(msg.code)) 1 else 0,
requestsReceived = if (RequestCodes(msg.code)) 1 else 0,
firstSeenTimeMillis = Some(now),
lastSeenTimeMillis = Some(now)
)
maybeStats = maybeStats.map(_.add(peerId, obs))

case PeerEvent.PeerDisconnected(peerId) =>
Expand All @@ -46,11 +51,12 @@ class PeerStatisticsActor(
object PeerStatisticsActor {
case class Stat(
responsesReceived: Int,
requestsReceived: Int,
firstSeenTimeMillis: Option[Long],
lastSeenTimeMillis: Option[Long]
)
object Stat {
val empty: Stat = Stat(0, None, None)
val empty: Stat = Stat(0, 0, None, None)

private def mergeOpt[A, B](x: A, y: A)(f: A => Option[B])(g: (B, B) => B): Option[B] = {
val (mx, my) = (f(x), f(y))
Expand All @@ -63,6 +69,7 @@ object PeerStatisticsActor {
(a, b) =>
Stat(
responsesReceived = a.responsesReceived + b.responsesReceived,
requestsReceived = a.requestsReceived + b.requestsReceived,
firstSeenTimeMillis = mergeOpt(a, b)(_.firstSeenTimeMillis)(math.min),
lastSeenTimeMillis = mergeOpt(a, b)(_.lastSeenTimeMillis)(math.max)
)
Expand All @@ -80,19 +87,27 @@ object PeerStatisticsActor {
case class GetStatsForPeer(window: FiniteDuration, peerId: PeerId)
case class StatsForPeer(peerId: PeerId, stat: Stat)

val ResponseCodes = Set(
Codes.NewBlockCode,
Codes.NewBlockHashesCode,
Codes.SignedTransactionsCode,
Codes.BlockHeadersCode,
Codes.BlockBodiesCode,
Codes.BlockHashesFromNumberCode,
Codes.NodeDataCode,
Codes.ReceiptsCode
)

val RequestCodes = Set(
Codes.GetBlockHeadersCode,
Codes.GetBlockBodiesCode,
Codes.GetNodeDataCode,
Codes.GetReceiptsCode
)

val MessageSubscriptionClassifier =
SubscriptionClassifier.MessageClassifier(
// Subscribe to response types, which indicate that we are getting data from that peer.
messageCodes = Set(
Codes.NewBlockCode,
Codes.NewBlockHashesCode,
Codes.SignedTransactionsCode,
Codes.BlockHeadersCode,
Codes.BlockBodiesCode,
Codes.BlockHashesFromNumberCode,
Codes.NodeDataCode,
Codes.ReceiptsCode
),
messageCodes = RequestCodes union ResponseCodes,
peerSelector = PeerSelector.AllPeers
)
}

0 comments on commit 8876c38

Please sign in to comment.