diff --git a/src/it/scala/io/iohk/ethereum/sync/util/CommonFakePeer.scala b/src/it/scala/io/iohk/ethereum/sync/util/CommonFakePeer.scala index c13a635b93..5b144e6265 100644 --- a/src/it/scala/io/iohk/ethereum/sync/util/CommonFakePeer.scala +++ b/src/it/scala/io/iohk/ethereum/sync/util/CommonFakePeer.scala @@ -32,6 +32,7 @@ import io.iohk.ethereum.network.{ KnownNodesManager, PeerEventBusActor, PeerManagerActor, + PeerStatisticsActor, ServerActor } import io.iohk.ethereum.nodebuilder.PruningConfigBuilder @@ -149,6 +150,8 @@ abstract class CommonFakePeer(peerName: String, fakePeerCustomConfig: FakePeerCu override val updateNodesInterval: FiniteDuration = 20.seconds override val shortBlacklistDuration: FiniteDuration = 1.minute override val longBlacklistDuration: FiniteDuration = 3.minutes + override val statSlotDuration: FiniteDuration = 1.minute + override val statSlotCount: Int = 30 } lazy val peerEventBus = system.actorOf(PeerEventBusActor.props, "peer-event-bus") @@ -167,12 +170,16 @@ abstract class CommonFakePeer(peerName: String, fakePeerCustomConfig: FakePeerCu lazy val authHandshaker: AuthHandshaker = AuthHandshaker(nodeKey, secureRandom) + lazy val peerStatistics = + system.actorOf(PeerStatisticsActor.props(peerEventBus, slotDuration = 1.minute, slotCount = 30)) + lazy val peerManager: ActorRef = system.actorOf( PeerManagerActor.props( peerDiscoveryManager, Config.Network.peer, peerEventBus, knownNodesManager, + peerStatistics, handshaker, authHandshaker, EthereumMessageDecoder, diff --git a/src/it/scala/io/iohk/ethereum/txExecTest/util/DumpChainApp.scala b/src/it/scala/io/iohk/ethereum/txExecTest/util/DumpChainApp.scala index 68576a635f..eed606728b 100644 --- a/src/it/scala/io/iohk/ethereum/txExecTest/util/DumpChainApp.scala +++ b/src/it/scala/io/iohk/ethereum/txExecTest/util/DumpChainApp.scala @@ -18,6 +18,7 @@ import io.iohk.ethereum.ledger.{InMemoryWorldStateProxy, InMemoryWorldStateProxy import io.iohk.ethereum.mpt.MptNode import io.iohk.ethereum.network.EtcPeerManagerActor.PeerInfo import io.iohk.ethereum.network.PeerManagerActor.PeerConfiguration +import io.iohk.ethereum.network.PeerStatisticsActor import io.iohk.ethereum.network.discovery.DiscoveryConfig import io.iohk.ethereum.network.handshaker.{EtcHandshaker, EtcHandshakerConfiguration, Handshaker} import io.iohk.ethereum.network.p2p.EthereumMessageDecoder @@ -60,6 +61,8 @@ object DumpChainApp extends App with NodeKeyBuilder with SecureRandomBuilder wit override val updateNodesInterval: FiniteDuration = 20.seconds override val shortBlacklistDuration: FiniteDuration = 1.minute override val longBlacklistDuration: FiniteDuration = 3.minutes + override val statSlotDuration: FiniteDuration = 1.minute + override val statSlotCount: Int = 30 } val actorSystem = ActorSystem("mantis_system") @@ -91,17 +94,20 @@ object DumpChainApp extends App with NodeKeyBuilder with SecureRandomBuilder wit val peerMessageBus = actorSystem.actorOf(PeerEventBusActor.props) + val peerStatistics = actorSystem.actorOf(PeerStatisticsActor.props(peerMessageBus, 1.minute, 30)) + val peerManager = actorSystem.actorOf( PeerManagerActor.props( peerDiscoveryManager = actorSystem.deadLetters, // TODO: fixme peerConfiguration = peerConfig, peerMessageBus = peerMessageBus, + peerStatistics = peerStatistics, knownNodesManager = actorSystem.deadLetters, // TODO: fixme handshaker = handshaker, authHandshaker = authHandshaker, messageDecoder = EthereumMessageDecoder, - discoveryConfig, - Config.Network.protocolVersion + discoveryConfig = discoveryConfig, + bestProtocolVersion = Config.Network.protocolVersion ), "peer-manager" ) diff --git a/src/main/resources/application.conf b/src/main/resources/application.conf index 9352e34f05..49276b2660 100644 --- a/src/main/resources/application.conf +++ b/src/main/resources/application.conf @@ -160,6 +160,12 @@ mantis { # Peer which disconnect during tcp connection becouse of other reasons will not be retried for this long duration # other reasons include: timeout during connection, wrong protocol, incompatible network long-blacklist-duration = 30.minutes + + # Resolution of moving window of peer statistics. + # Will be multiplied by `stat-slot-count` to give the overall length of peer statistics availability. + stat-slot-duration = 1.minute + # How many slots of peer statistics to keep in the moving window, e.g. 60 * 1.minute to keep stats for the last hour with 1 minute resolution. + stat-slot-count = 60 } rpc { diff --git a/src/main/scala/io/iohk/ethereum/network/PeerManagerActor.scala b/src/main/scala/io/iohk/ethereum/network/PeerManagerActor.scala index 2f600845e0..4a4f0546e7 100644 --- a/src/main/scala/io/iohk/ethereum/network/PeerManagerActor.scala +++ b/src/main/scala/io/iohk/ethereum/network/PeerManagerActor.scala @@ -31,6 +31,7 @@ class PeerManagerActor( peerDiscoveryManager: ActorRef, peerConfiguration: PeerConfiguration, knownNodesManager: ActorRef, + peerStatistics: ActorRef, peerFactory: (ActorContext, InetSocketAddress, Boolean) => ActorRef, discoveryConfig: DiscoveryConfig, externalSchedulerOpt: Option[Scheduler] = None @@ -52,8 +53,6 @@ class PeerManagerActor( import PeerManagerActor._ import akka.pattern.{ask, pipe} - private type PeerMap = Map[PeerId, Peer] - implicit class ConnectedPeersOps(connectedPeers: ConnectedPeers) { def outgoingConnectionDemand: Int = peerConfiguration.maxOutgoingPeers - connectedPeers.outgoingPeersCount @@ -339,6 +338,7 @@ object PeerManagerActor { peerConfiguration: PeerConfiguration, peerMessageBus: ActorRef, knownNodesManager: ActorRef, + peerStatistics: ActorRef, handshaker: Handshaker[R], authHandshaker: AuthHandshaker, messageDecoder: MessageDecoder, @@ -362,6 +362,7 @@ object PeerManagerActor { peerDiscoveryManager, peerConfiguration, knownNodesManager, + peerStatistics, peerFactory = factory, discoveryConfig ) @@ -410,6 +411,8 @@ object PeerManagerActor { val updateNodesInterval: FiniteDuration val shortBlacklistDuration: FiniteDuration val longBlacklistDuration: FiniteDuration + val statSlotDuration: FiniteDuration + val statSlotCount: Int } trait FastSyncHostConfiguration { @@ -444,5 +447,4 @@ object PeerManagerActor { case class OutgoingConnectionAlreadyHandled(uri: URI) extends ConnectionError case class PeerAddress(value: String) extends BlackListId - } diff --git a/src/main/scala/io/iohk/ethereum/network/PeerStat.scala b/src/main/scala/io/iohk/ethereum/network/PeerStat.scala new file mode 100644 index 0000000000..804faf1333 --- /dev/null +++ b/src/main/scala/io/iohk/ethereum/network/PeerStat.scala @@ -0,0 +1,31 @@ +package io.iohk.ethereum.network + +import cats._ +import cats.implicits._ + +case class PeerStat( + responsesReceived: Int, + requestsReceived: Int, + firstSeenTimeMillis: Option[Long], + lastSeenTimeMillis: Option[Long] +) +object PeerStat { + val empty: PeerStat = PeerStat(0, 0, None, None) + + private def mergeOpt[A, B](x: A, y: A)(f: A => Option[B])(g: (B, B) => B): Option[B] = { + val (mx, my) = (f(x), f(y)) + (mx, my).mapN(g) orElse mx orElse my + } + + implicit val monoid: Monoid[PeerStat] = + Monoid.instance( + empty, + (a, b) => + PeerStat( + responsesReceived = a.responsesReceived + b.responsesReceived, + requestsReceived = a.requestsReceived + b.requestsReceived, + firstSeenTimeMillis = mergeOpt(a, b)(_.firstSeenTimeMillis)(math.min), + lastSeenTimeMillis = mergeOpt(a, b)(_.lastSeenTimeMillis)(math.max) + ) + ) +} diff --git a/src/main/scala/io/iohk/ethereum/network/PeerStatisticsActor.scala b/src/main/scala/io/iohk/ethereum/network/PeerStatisticsActor.scala new file mode 100644 index 0000000000..7d9d757c5f --- /dev/null +++ b/src/main/scala/io/iohk/ethereum/network/PeerStatisticsActor.scala @@ -0,0 +1,85 @@ +package io.iohk.ethereum.network + +import akka.actor._ +import io.iohk.ethereum.network.PeerEventBusActor._ +import io.iohk.ethereum.network.p2p.messages.Codes +import java.time.Clock +import scala.concurrent.duration.FiniteDuration + +class PeerStatisticsActor( + peerEventBus: ActorRef, + var maybeStats: Option[TimeSlotStats[PeerId, PeerStat]] +) extends Actor { + import PeerStatisticsActor._ + + override def preStart(): Unit = { + // Subscribe to messages received from handshaked peers to maintain stats. + peerEventBus ! Subscribe(MessageSubscriptionClassifier) + // Removing peers is an optimisation to free space, but eventually the stats would be overwritten anyway. + peerEventBus ! Subscribe(SubscriptionClassifier.PeerDisconnectedClassifier(PeerSelector.AllPeers)) + } + + def receive: Receive = handlePeerEvents orElse handleStatsRequests + + private def handlePeerEvents: Receive = { + case PeerEvent.MessageFromPeer(msg, peerId) => + val now = System.currentTimeMillis() + val obs = PeerStat( + responsesReceived = if (ResponseCodes(msg.code)) 1 else 0, + requestsReceived = if (RequestCodes(msg.code)) 1 else 0, + firstSeenTimeMillis = Some(now), + lastSeenTimeMillis = Some(now) + ) + maybeStats = maybeStats.map(_.add(peerId, obs)) + + case PeerEvent.PeerDisconnected(peerId) => + maybeStats = maybeStats.map(_.remove(peerId)) + } + + private def handleStatsRequests: Receive = { + case GetStatsForAll(window) => + val stats = maybeStats.map(_.getAll(Some(window))).getOrElse(Map.empty) + sender ! StatsForAll(stats) + + case GetStatsForPeer(window, peerId) => + val stats = maybeStats.map(_.get(peerId, Some(window))).getOrElse(PeerStat.empty) + sender ! StatsForPeer(peerId, stats) + } +} + +object PeerStatisticsActor { + def props(peerEventBus: ActorRef, slotDuration: FiniteDuration, slotCount: Int): Props = + Props { + implicit val clock = Clock.systemUTC() + new PeerStatisticsActor(peerEventBus, TimeSlotStats[PeerId, PeerStat](slotDuration, slotCount)) + } + + case class GetStatsForAll(window: FiniteDuration) + case class StatsForAll(stats: Map[PeerId, PeerStat]) + case class GetStatsForPeer(window: FiniteDuration, peerId: PeerId) + case class StatsForPeer(peerId: PeerId, stat: PeerStat) + + val ResponseCodes = Set( + Codes.NewBlockCode, + Codes.NewBlockHashesCode, + Codes.SignedTransactionsCode, + Codes.BlockHeadersCode, + Codes.BlockBodiesCode, + Codes.BlockHashesFromNumberCode, + Codes.NodeDataCode, + Codes.ReceiptsCode + ) + + val RequestCodes = Set( + Codes.GetBlockHeadersCode, + Codes.GetBlockBodiesCode, + Codes.GetNodeDataCode, + Codes.GetReceiptsCode + ) + + val MessageSubscriptionClassifier = + SubscriptionClassifier.MessageClassifier( + messageCodes = RequestCodes union ResponseCodes, + peerSelector = PeerSelector.AllPeers + ) +} diff --git a/src/main/scala/io/iohk/ethereum/network/TimeSlotStats.scala b/src/main/scala/io/iohk/ethereum/network/TimeSlotStats.scala new file mode 100644 index 0000000000..34c4d5ddd6 --- /dev/null +++ b/src/main/scala/io/iohk/ethereum/network/TimeSlotStats.scala @@ -0,0 +1,140 @@ +package io.iohk.ethereum.network + +import cats._ +import cats.implicits._ +import java.time.Clock +import scala.concurrent.duration.{Duration, FiniteDuration} +import scala.annotation.tailrec + +/** Track statistics over time a fixed size timewindow. */ +class TimeSlotStats[K, V: Monoid] private ( + // Time resolution. + val slotDuration: FiniteDuration, + // The last written position in the buffer. + val lastIdx: Int, + // Ring buffer of slots statistics. + val buffer: TimeSlotStats.Buffer[K, V] +)(implicit clock: Clock) { + import TimeSlotStats._ + + /** Overall length of the timewindow. */ + def duration: FiniteDuration = slotDuration * slotCount + def slotCount: Int = buffer.size + + /** Merge new stats for a given key in the current timestamp. */ + def add(key: K, stat: V): TimeSlotStats[K, V] = { + val currSlotId = slotId(currentTimeMillis) + val lastEntry = buffer(lastIdx) + + if (currSlotId == lastEntry.slotId) { + // We're still in the same timeslot, so just append stats. + val newEntry = lastEntry.add(key, stat) + updated(lastIdx, newEntry) + } else if (currSlotId > lastEntry.slotId) { + // Go to the next slot. + val newIdx = succ(lastIdx) + val newEntry = Entry(currSlotId, Map(key -> stat)) + updated(newIdx, newEntry) + } else { + // Going backwards in time, just ignore it. + this + } + } + + /** Forget all statistics about a given key. */ + def remove(key: K): TimeSlotStats[K, V] = + updated(lastIdx, buffer.mapValues(_.remove(key))) + + /** Aggregate stats for a key in all slots that are within the duration. */ + def get(key: K, window: Option[Duration] = None): V = + fold(Monoid[V].empty, window getOrElse duration) { case (acc, stats) => + stats.get(key).map(acc |+| _).getOrElse(acc) + } + + /** Aggregate all stats in all slots within the duration. */ + def getAll(window: Option[Duration] = None): Map[K, V] = + fold(Map.empty[K, V], window getOrElse duration) { case (acc, stats) => + acc |+| stats + } + + private def fold[A](init: A, window: Duration)(f: (A, Map[K, V]) => A) = { + val (start, end) = slotRange(currentTimeMillis, window) + + @tailrec + def loop(idx: Int, acc: List[Map[K, V]]): List[Map[K, V]] = { + val entry = buffer(idx) + if (entry.slotId < start || end < entry.slotId) + acc + else { + val nextAcc = entry.slotStats :: acc + val nextIdx = pred(idx) + if (nextIdx == lastIdx) nextAcc else loop(nextIdx, nextAcc) + } + } + + loop(lastIdx, Nil).foldLeft(init)(f) + } + + private def currentTimeMillis: Timestamp = + clock.millis() + + /** Truncate the current timestamp based on the slot duration. */ + private def slotId(timestamp: Timestamp): Timestamp = { + timestamp - timestamp % slotDuration.toMillis + } + + /** The range of time slots based on the current timestamp and the buffer duration. */ + def slotRange(timestamp: Timestamp, window: Duration): (Timestamp, Timestamp) = { + val end = slotId(timestamp) + val start = slotId(timestamp - window.toMillis) + start -> end + } + + private def succ(idx: Int): Int = (idx + 1) % slotCount + private def pred(idx: Int): Int = if (idx == 0) slotCount - 1 else idx - 1 + + private def updated( + lastIdx: Int, + entry: Entry[K, V] + ): TimeSlotStats[K, V] = + updated(lastIdx, buffer.updated(lastIdx, entry)) + + private def updated( + lastIdx: Int, + buffer: Buffer[K, V] + ): TimeSlotStats[K, V] = + new TimeSlotStats[K, V](slotDuration, lastIdx, buffer) +} + +object TimeSlotStats { + + // Milliseconds since epoch. + type Timestamp = Long + // Using Map as a persistent ringbuffer because of frequent updates. + type Buffer[K, V] = Map[Int, Entry[K, V]] + + case class Entry[K, V: Monoid]( + slotId: Timestamp, + slotStats: Map[K, V] + ) { + def add(key: K, stat: V): Entry[K, V] = + copy(slotStats = slotStats |+| Map(key -> stat)) + + def remove(key: K): Entry[K, V] = + copy(slotStats = slotStats - key) + } + + def apply[K, V: Monoid]( + slotDuration: FiniteDuration, + slotCount: Int + )(implicit clock: Clock): Option[TimeSlotStats[K, V]] = + if (slotDuration == Duration.Zero || slotCount <= 0) None + else + Some { + new TimeSlotStats[K, V]( + slotDuration, + lastIdx = slotCount - 1, // So the first slot we fill is going to be 0. + buffer = Range(0, slotCount).map(_ -> Entry(0L, Map.empty[K, V])).toMap + ) + } +} diff --git a/src/main/scala/io/iohk/ethereum/nodebuilder/NodeBuilder.scala b/src/main/scala/io/iohk/ethereum/nodebuilder/NodeBuilder.scala index bc75b12dfb..3f48393a21 100644 --- a/src/main/scala/io/iohk/ethereum/nodebuilder/NodeBuilder.scala +++ b/src/main/scala/io/iohk/ethereum/nodebuilder/NodeBuilder.scala @@ -185,6 +185,21 @@ trait PeerEventBusBuilder { lazy val peerEventBus: ActorRef = system.actorOf(PeerEventBusActor.props, "peer-event-bus") } +trait PeerStatisticsBuilder { + self: ActorSystemBuilder with PeerEventBusBuilder => + + lazy val peerStatistics: ActorRef = system.actorOf( + PeerStatisticsActor.props( + peerEventBus, + // `slotCount * slotDuration` should be set so that it's at least as long + // as any client of the `PeerStatisticsActor` requires. + slotDuration = Config.Network.peer.statSlotDuration, + slotCount = Config.Network.peer.statSlotCount + ), + "peer-statistics" + ) +} + trait PeerManagerActorBuilder { self: ActorSystemBuilder @@ -194,7 +209,8 @@ trait PeerManagerActorBuilder { with PeerDiscoveryManagerBuilder with DiscoveryConfigBuilder with StorageBuilder - with KnownNodesManagerBuilder => + with KnownNodesManagerBuilder + with PeerStatisticsBuilder => lazy val peerConfiguration: PeerConfiguration = Config.Network.peer @@ -204,6 +220,7 @@ trait PeerManagerActorBuilder { Config.Network.peer, peerEventBus, knownNodesManager, + peerStatistics, handshaker, authHandshaker, EthereumMessageDecoder, @@ -637,6 +654,7 @@ trait Node with NodeStatusBuilder with ForkResolverBuilder with HandshakerBuilder + with PeerStatisticsBuilder with PeerManagerActorBuilder with ServerActorBuilder with SyncControllerBuilder diff --git a/src/main/scala/io/iohk/ethereum/utils/Config.scala b/src/main/scala/io/iohk/ethereum/utils/Config.scala index baa93a4dbc..2073951116 100644 --- a/src/main/scala/io/iohk/ethereum/utils/Config.scala +++ b/src/main/scala/io/iohk/ethereum/utils/Config.scala @@ -87,6 +87,8 @@ object Config { val shortBlacklistDuration: FiniteDuration = peerConfig.getDuration("short-blacklist-duration").toMillis.millis val longBlacklistDuration: FiniteDuration = peerConfig.getDuration("long-blacklist-duration").toMillis.millis + val statSlotDuration: FiniteDuration = peerConfig.getDuration("stat-slot-duration").toMillis.millis + val statSlotCount: Int = peerConfig.getInt("stat-slot-count") } } diff --git a/src/test/scala/io/iohk/ethereum/blockchain/sync/BlockchainHostActorSpec.scala b/src/test/scala/io/iohk/ethereum/blockchain/sync/BlockchainHostActorSpec.scala index 0f8aec9ceb..e266145016 100644 --- a/src/test/scala/io/iohk/ethereum/blockchain/sync/BlockchainHostActorSpec.scala +++ b/src/test/scala/io/iohk/ethereum/blockchain/sync/BlockchainHostActorSpec.scala @@ -289,6 +289,8 @@ class BlockchainHostActorSpec extends AnyFlatSpec with Matchers { override val updateNodesInterval: FiniteDuration = 20.seconds override val shortBlacklistDuration: FiniteDuration = 1.minute override val longBlacklistDuration: FiniteDuration = 3.minutes + override val statSlotDuration: FiniteDuration = 1.minute + override val statSlotCount: Int = 30 } val baseBlockHeader = Fixtures.Blocks.Block3125369.header diff --git a/src/test/scala/io/iohk/ethereum/network/PeerManagerSpec.scala b/src/test/scala/io/iohk/ethereum/network/PeerManagerSpec.scala index 318db0892d..c420b254f9 100644 --- a/src/test/scala/io/iohk/ethereum/network/PeerManagerSpec.scala +++ b/src/test/scala/io/iohk/ethereum/network/PeerManagerSpec.scala @@ -265,6 +265,7 @@ class PeerManagerSpec val peerDiscoveryManager = TestProbe() val peerEventBus = TestProbe() val knownNodesManager = TestProbe() + val peerStatistics = TestProbe() val bootstrapNodes: Set[Node] = DiscoveryConfig(Config.config, Config.blockchains.blockchainConfig.bootstrapNodes).bootstrapNodes @@ -310,6 +311,7 @@ class PeerManagerSpec peerDiscoveryManager.ref, peerConfiguration, knownNodesManager.ref, + peerStatistics.ref, peerFactory, discoveryConfig, Some(time.scheduler) diff --git a/src/test/scala/io/iohk/ethereum/network/PeerStatisticsSpec.scala b/src/test/scala/io/iohk/ethereum/network/PeerStatisticsSpec.scala new file mode 100644 index 0000000000..02990bc965 --- /dev/null +++ b/src/test/scala/io/iohk/ethereum/network/PeerStatisticsSpec.scala @@ -0,0 +1,66 @@ +package io.iohk.ethereum.network + +import akka.actor._ +import akka.testkit.{TestKit, TestProbe} +import io.iohk.ethereum.network.PeerEventBusActor._ +import io.iohk.ethereum.network.p2p.messages.PV61.NewBlockHashes +import io.iohk.ethereum.WithActorSystemShutDown +import org.scalatest.flatspec.AnyFlatSpecLike +import org.scalatest.matchers.should.Matchers +import scala.concurrent.duration._ + +class PeerStatisticsSpec + extends TestKit(ActorSystem("PeerStatisticsSpec_System")) + with AnyFlatSpecLike + with WithActorSystemShutDown + with Matchers { + + import PeerStatisticsActor._ + + behavior of "PeerStatisticsActor" + + it should "subscribe to peer events" in new Fixture { + peerEventBus.expectMsg(Subscribe(PeerStatisticsActor.MessageSubscriptionClassifier)) + peerEventBus.expectMsg(Subscribe(SubscriptionClassifier.PeerDisconnectedClassifier(PeerSelector.AllPeers))) + } + + it should "initially return default stats for unknown peers" in new Fixture { + val peerId = PeerId("Alice") + peerStatistics ! GetStatsForPeer(1.minute, peerId) + sender.expectMsg(StatsForPeer(peerId, PeerStat.empty)) + } + + it should "initially return default stats when there are no peers" in new Fixture { + peerStatistics ! GetStatsForAll(1.minute) + sender.expectMsg(StatsForAll(Map.empty)) + } + + it should "count received messages" in new Fixture { + val alice = PeerId("Alice") + val bob = PeerId("Bob") + peerStatistics ! PeerEvent.MessageFromPeer(NewBlockHashes(Seq.empty), alice) + peerStatistics ! PeerEvent.MessageFromPeer(NewBlockHashes(Seq.empty), bob) + peerStatistics ! PeerEvent.MessageFromPeer(NewBlockHashes(Seq.empty), alice) + peerStatistics ! GetStatsForAll(1.minute) + + val stats = sender.expectMsgType[StatsForAll] + stats.stats should not be empty + + val statA = stats.stats(alice) + statA.responsesReceived shouldBe 2 + statA.lastSeenTimeMillis shouldBe >(statA.firstSeenTimeMillis) + + val statB = stats.stats(bob) + statB.responsesReceived shouldBe 1 + statB.lastSeenTimeMillis shouldBe statB.firstSeenTimeMillis + } + + trait Fixture { + val sender = TestProbe() + implicit val senderRef = sender.ref + + val peerEventBus = TestProbe() + val peerStatistics = + system.actorOf(PeerStatisticsActor.props(peerEventBus.ref, slotDuration = 1.minute, slotCount = 30)) + } +} diff --git a/src/test/scala/io/iohk/ethereum/network/TimeSlotStatsSpec.scala b/src/test/scala/io/iohk/ethereum/network/TimeSlotStatsSpec.scala new file mode 100644 index 0000000000..eef32794ed --- /dev/null +++ b/src/test/scala/io/iohk/ethereum/network/TimeSlotStatsSpec.scala @@ -0,0 +1,289 @@ +package io.iohk.ethereum.network + +import cats._ +import cats.implicits._ +import cats.data.State +import cats.kernel.Monoid +import java.time.Clock +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers +import org.scalatest.Inspectors +import org.scalatestplus.scalacheck.ScalaCheckDrivenPropertyChecks +import org.scalacheck.{Arbitrary, Gen, Shrink}, Arbitrary.arbitrary +import scala.concurrent.duration._ +import org.scalatest.compatible.Assertion +import java.time.ZoneId +import java.time.Instant + +class TimeSlotStatsSpec extends AnyFlatSpec with Matchers with ScalaCheckDrivenPropertyChecks { + import TimeSlotStatsSpec._ + + behavior of "TimeSlotStats" + + it should "add new keys to the last timeslot" in test { + for { + stats <- add("foo", 1) + } yield { + stats.buffer(0).slotStats("foo") shouldBe 1 + } + } + + it should "merge keys in the last timeslot" in test { + for { + _ <- add("foo", 1) + _ <- add("foo", 2) + _ <- windClock(10.millis) + _ <- add("bar", 0) + stats <- getStats + } yield { + stats.buffer(0).slotStats("foo") shouldBe 3 + stats.buffer(0).slotStats("bar") shouldBe 0 + } + } + + it should "ignore updates for earlier timeslots" in test { + for { + stats0 <- add("foo", 1) + _ <- windClock(-defaultSlotDuration - 1.millis) + stats1 <- add("foo", 2) + } yield { + stats0.buffer shouldBe stats1.buffer + } + } + + it should "add new slots for the next timeslot" in test { + for { + _ <- add("foo", 1) + _ <- windClock(defaultSlotDuration + 1.millis) + _ <- add("foo", 2) + stats <- getStats + } yield { + stats.buffer(0).slotStats("foo") shouldBe 1 + stats.buffer(1).slotStats("foo") shouldBe 2 + } + } + + it should "remove keys from all slots" in test { + for { + _ <- add("foo", 1) + _ <- add("bar", 2) + _ <- windClock(defaultSlotDuration + 1.millis) + _ <- add("foo", 3) + _ <- windClock(1.millis) + _ <- add("bar", 4) + _ <- remove("foo") + stats <- getStats + } yield { + Inspectors.forAll(stats.buffer.values) { entry => + entry.slotStats should not contain key("foo") + } + Inspectors.forExactly(2, stats.buffer.values) { entry => + entry.slotStats should contain key ("bar") + } + } + } + + it should "turn around and overwrite the first slot after all of them have been written to" in test { + for { + _ <- Range + .inclusive(0, defaultSlotCount) + .map { i => + add("foo", i) >> windClock(defaultSlotDuration) + } + .toList + .sequence + stats <- getStats + } yield { + stats.buffer(0).slotStats("foo") shouldBe defaultSlotCount + stats.buffer(1).slotStats("foo") shouldBe 1 + } + } + + def testAggregate(f: TestState[String, Int, Assertion]) = test { + val setup = for { + _ <- add("foo", 1) + _ <- windClock(defaultSlotDuration * defaultSlotCount) // puts t0 out of scope + _ <- add("bar", 2) + _ <- windClock(defaultSlotDuration) + _ <- add("foo", 3) + _ <- add("bar", 4) + s <- getStats + } yield { + s.lastIdx shouldBe 2 + () + } + + setup >> f + } + + it should "aggregate the stats of a given key" in testAggregate { + for { + stats <- getStats + } yield { + stats.get("foo") shouldBe 3 + stats.get("bar") shouldBe 6 + stats.get("BAR") shouldBe 0 + } + } + + it should "aggregate all stats" in testAggregate { + for { + stats <- getStats + } yield { + stats.getAll() shouldBe Map("foo" -> 3, "bar" -> 6) + } + } + + it should "aggregate stats that still fall in the window" in testAggregate { + for { + _ <- windClock(defaultSlotDuration * 2) + stats <- getStats + } yield { + stats.getAll() should not be empty + } + } + + it should "not aggregate beyond the window" in testAggregate { + for { + _ <- windClock(defaultSlotDuration * (defaultSlotCount + 1)) + stats <- getStats + } yield { + stats.getAll() shouldBe empty + } + } + + it should "handle 0 in configuration" in { + // This might happen if we base the values on something which can be 0. + implicit val clock = Clock.systemUTC() + val zeros = List( + TimeSlotStats[String, Int](slotDuration = 1.minutes, slotCount = 0), + TimeSlotStats[String, Int](slotDuration = 0.minutes, slotCount = 1) + ) + Inspectors.forAll(zeros) { + _ shouldBe empty + } + } + + it should "aggregate Int" in { + testRandomAggregation[String, Int](_ + _) + } + + it should "aggregate Boolean" in { + implicit val boolMonoid: Monoid[Boolean] = + Monoid.instance[Boolean](false, _ || _) + testRandomAggregation[Int, Boolean](_ || _) + } + + it should "aggregate Set" in { + testRandomAggregation[Int, Set[Int]](_ union _) + } + + it should "aggregate Vector" in { + testRandomAggregation[Int, Vector[Int]](_ ++ _) + } + + def testRandomAggregation[K: Arbitrary, V: Arbitrary: Monoid](f: (V, V) => V): Unit = { + forAll(genTimeSlotStats[K, V]) { case (stats, clock, window) => + val timestamp = clock.millis() + val (start, end) = stats.slotRange(timestamp, window) + + val windowBuffer = stats.buffer.values + .filter { entry => + start <= entry.slotId && entry.slotId <= end + } + .toVector + .sortBy(_.slotId) + + val all = stats.getAll(Some(window)) + + if (windowBuffer.exists(_.slotStats.nonEmpty)) { + all should not be empty + } else { + all shouldBe empty + } + + Inspectors.forAll(all.keySet) { key => + val keyStats = windowBuffer.flatMap { + _.slotStats.get(key) + } + val expected = keyStats.reduce(f) + + all(key) shouldBe expected + stats.get(key, Some(window)) shouldBe expected + } + } + } +} + +object TimeSlotStatsSpec { + + class MockClock( + private var currentTimeMillis: Long = System.currentTimeMillis, + zoneId: ZoneId = ZoneId.of("UTC") + ) extends Clock { + def windByMillis(by: Long): Unit = + currentTimeMillis = currentTimeMillis + by + + override def instant(): Instant = Instant.ofEpochMilli(currentTimeMillis) + // The following are implemented for completness' sake but not used: + override def getZone(): ZoneId = zoneId + override def withZone(x: ZoneId): Clock = new MockClock(currentTimeMillis, zoneId) + } + + type TestState[K, V, A] = State[(TimeSlotStats[K, V], MockClock), A] + + val defaultSlotDuration = 1.minute + val defaultSlotCount = 30 + + def getStats[K, V]: TestState[K, V, TimeSlotStats[K, V]] = + State.get.map(_._1) + + def modStats[K, V](f: TimeSlotStats[K, V] => TimeSlotStats[K, V]): TestState[K, V, Unit] = + State.modify { case (stats, clock) => + (f(stats), clock) + } + + def add[K, V](key: K, value: V): TestState[K, V, TimeSlotStats[K, V]] = + modStats[K, V](_.add(key, value)) >> getStats[K, V] + + def remove[K, V](key: K): TestState[K, V, TimeSlotStats[K, V]] = + modStats[K, V](_.remove(key)) >> getStats[K, V] + + def windClock[K, V](by: FiniteDuration): TestState[K, V, Unit] = + State.modify { case (stats, clock) => + clock.windByMillis(by.toMillis) + (stats, clock) + } + + def test[K, V: Monoid](s: TestState[K, V, Assertion]): Unit = { + implicit val clock = new MockClock() + val stats = TimeSlotStats[K, V](defaultSlotDuration, defaultSlotCount).get + s.run(stats -> clock).value + } + + implicit def noShrink[T]: Shrink[T] = + Shrink[T](_ => Stream.empty) + + def genTimeSlotStats[K: Arbitrary, V: Arbitrary: Monoid]: Gen[(TimeSlotStats[K, V], MockClock, FiniteDuration)] = + for { + slotDuration <- Gen.choose(1, 5 * 60).map(_.seconds) + slotCount <- Gen.choose(1, 30) + keyCount <- Gen.choose(1, 5) + keys <- Gen.listOfN(keyCount, arbitrary[K]) + eventCount <- Gen.choose(0, 100) + timestamp = System.currentTimeMillis + event = for { + d <- Gen.choose(0, 10 * 60).map(_.seconds) + k <- Gen.oneOf(keys) + v <- arbitrary[V] + } yield (d, k, v) + events <- Gen.listOfN(eventCount, event) + clock = new MockClock() + empty = TimeSlotStats[K, V](slotDuration, slotCount)(Monoid[V], clock).get + stats = events.foldLeft(empty) { case (stats, (duration, key, stat)) => + clock.windByMillis(duration.toMillis) + stats.add(key, stat) + } + window <- Gen.choose(0, stats.duration.toSeconds * 2).map(_.seconds) + } yield (stats, clock, window) +} diff --git a/src/test/scala/io/iohk/ethereum/network/p2p/PeerActorSpec.scala b/src/test/scala/io/iohk/ethereum/network/p2p/PeerActorSpec.scala index b71846cc7c..a65cead9eb 100644 --- a/src/test/scala/io/iohk/ethereum/network/p2p/PeerActorSpec.scala +++ b/src/test/scala/io/iohk/ethereum/network/p2p/PeerActorSpec.scala @@ -517,6 +517,8 @@ class PeerActorSpec override val updateNodesInterval: FiniteDuration = 20.seconds override val shortBlacklistDuration: FiniteDuration = 1.minute override val longBlacklistDuration: FiniteDuration = 3.minutes + override val statSlotDuration: FiniteDuration = 1.minute + override val statSlotCount: Int = 30 } }