Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ETCM-463: Add PeerStatisticsActor to track message counts #849

Merged
merged 33 commits into from
Dec 15, 2020
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
657ff62
ETCM-463: Add TimeSlotStats.
aakoshh Dec 9, 2020
7b60f10
ETCM-463: Ignore zero config.
aakoshh Dec 9, 2020
893e71e
ETCM-463: Maintain a message counter in PeerManagerActor.
aakoshh Dec 9, 2020
69a1093
ETCM-463: Handle zero in the constructor.
aakoshh Dec 9, 2020
4e065d7
ETCM-463: Test that only the removed key disappears.
aakoshh Dec 9, 2020
bd83cb1
ETCM-463: Prevent using copy.
aakoshh Dec 9, 2020
92e48b3
ETCM-463: Merge the two vectors.
aakoshh Dec 9, 2020
8637bea
Merge remote-tracking branch 'origin/develop' into ETCM-463-track-pee…
aakoshh Dec 9, 2020
9cca124
ETCM-463: Test again that aggregate filtering works.
aakoshh Dec 9, 2020
cc48ceb
ETCM-463: Save some merging if key isn't found.
aakoshh Dec 10, 2020
788829b
Merge remote-tracking branch 'origin/develop' into ETCM-463-track-pee…
aakoshh Dec 10, 2020
d71957d
ETCM-463: ScalaCheck test for aggregation. Fix looping with 1 sized a…
aakoshh Dec 10, 2020
f32ad6d
ETCM-463: Moved stats to a new PeerStatisticsActor.
aakoshh Dec 10, 2020
5c86584
ETCM-463: Passing the PeerStatisticsActor to the PeerManagerActor.
aakoshh Dec 10, 2020
40d9731
ETCM-463: Use a dedicated Stat with a responsesReceived rather than a…
aakoshh Dec 10, 2020
2b4e619
ETCM-463: Tests for PeerStatisticsActor
aakoshh Dec 10, 2020
1fb7ca9
ETCM-463: Fix infitite loop.
aakoshh Dec 10, 2020
8eec318
ETCM-463: Add new parameter in IT.
aakoshh Dec 10, 2020
62ba18e
ETCM-463: Use java.time.Clock
aakoshh Dec 11, 2020
7fae250
ETCM-463: Add first/last seen timestamps.
aakoshh Dec 11, 2020
ddb8cb9
ETCM-463: Accept a time window to filter stats.
aakoshh Dec 11, 2020
fa2e363
ETCM-463: Separate config for stats.
aakoshh Dec 11, 2020
60b1baa
Merge remote-tracking branch 'origin/develop' into ETCM-463-track-pee…
aakoshh Dec 11, 2020
b4d8fd7
ETCM-463: All may be empty in test if window is small.
aakoshh Dec 11, 2020
8876c38
ETCM-463: Count requests as well.
aakoshh Dec 11, 2020
ce72804
ETCM-463: Use Map instead of Vector.
aakoshh Dec 11, 2020
505dda9
ETCM-463: Make sure order is preserved.
aakoshh Dec 13, 2020
3ff45f2
ETCM-463: Move subscription to preStart.
aakoshh Dec 14, 2020
a6d8676
ETCM-463: Move PeerStat to a separate file.
aakoshh Dec 14, 2020
c4a6107
ETCM-463: Add @tailrec
aakoshh Dec 14, 2020
7ab285c
ETCM-463: Implement withZone.
aakoshh Dec 15, 2020
33ff543
Merge remote-tracking branch 'origin/develop' into ETCM-463-track-pee…
aakoshh Dec 15, 2020
3f2dc19
Merge remote-tracking branch 'origin/develop' into ETCM-463-track-pee…
aakoshh Dec 15, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 30 additions & 2 deletions src/main/scala/io/iohk/ethereum/network/PeerManagerActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.util.{Failure, Success}
import io.iohk.ethereum.network.p2p.messages.Codes

class PeerManagerActor(
peerEventBus: ActorRef,
Expand Down Expand Up @@ -52,8 +53,6 @@ class PeerManagerActor(
import PeerManagerActor._
import akka.pattern.{ask, pipe}

private type PeerMap = Map[PeerId, Peer]

implicit class ConnectedPeersOps(connectedPeers: ConnectedPeers) {
def outgoingConnectionDemand: Int =
peerConfiguration.maxOutgoingPeers - connectedPeers.outgoingPeersCount
Expand All @@ -71,6 +70,15 @@ class PeerManagerActor(
// Subscribe to the handshake event of any peer
peerEventBus ! Subscribe(SubscriptionClassifier.PeerHandshaked)

// Subscribe to messages received from peers to maintain stats.
peerEventBus ! Subscribe(MessageSubscriptionClassifier)
aakoshh marked this conversation as resolved.
Show resolved Hide resolved

var maybeMessageStats = TimeSlotStats[PeerId, Int](
slotDuration = 1.minute,
// TODO: This could be set based on min-prune-age.
slotCount = peerConfiguration.longBlacklistDuration.toMinutes.toInt
)

def scheduler: Scheduler = externalSchedulerOpt getOrElse context.system.scheduler

override val supervisorStrategy: OneForOneStrategy =
Expand Down Expand Up @@ -254,6 +262,7 @@ class PeerManagerActor(
val (terminatedPeersIds, newConnectedPeers) = connectedPeers.removeTerminatedPeer(ref)
terminatedPeersIds.foreach { peerId =>
peerEventBus ! Publish(PeerEvent.PeerDisconnected(peerId))
maybeMessageStats = maybeMessageStats.map(_.remove(peerId))
}
// Try to replace a lost connection with another one.
if (newConnectedPeers.outgoingConnectionDemand > 0) {
Expand All @@ -278,6 +287,9 @@ class PeerManagerActor(
} else {
context become listening(connectedPeers.promotePeerToHandshaked(handshakedPeer))
}

case PeerEvent.MessageFromPeer(_, peerId) =>
maybeMessageStats = maybeMessageStats.map(_.add(peerId, 1))
}

private def createPeer(
Expand Down Expand Up @@ -445,4 +457,20 @@ object PeerManagerActor {

case class PeerAddress(value: String) extends BlackListId

val MessageSubscriptionClassifier =
SubscriptionClassifier.MessageClassifier(
// Subscribe to response types, which indidate that we are getting data from that peer.
messageCodes = Set(
Codes.NewBlockCode,
Codes.NewBlockHashesCode,
Codes.SignedTransactionsCode,
Codes.BlockHeadersCode,
Codes.BlockBodiesCode,
Codes.BlockHashesFromNumberCode,
Codes.NodeDataCode,
Codes.ReceiptsCode
),
peerSelector = PeerSelector.AllPeers
)

}
129 changes: 129 additions & 0 deletions src/main/scala/io/iohk/ethereum/network/TimeSlotStats.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package io.iohk.ethereum.network

import cats._
import cats.implicits._
import scala.concurrent.duration.{Duration, FiniteDuration}

/** Track statistics over time a fixed size timewindow. */
class TimeSlotStats[K, V: Monoid] private (
// Time resolution.
val slotDuration: FiniteDuration,
// The last written position in the buffer.
val lastIdx: Int,
// Ring buffer of slots statistics.
val buffer: IndexedSeq[TimeSlotStats.Entry[K, V]]
) {
import TimeSlotStats._

/** Overall length of the timewindow. */
def duration: FiniteDuration = slotDuration * slotCount
def slotCount: Int = buffer.size

/** Merge new stats for a given key in the current timestamp. */
def add(key: K, stat: V, timestamp: Timestamp = System.currentTimeMillis): TimeSlotStats[K, V] = {
aakoshh marked this conversation as resolved.
Show resolved Hide resolved
val currSlotId = slotId(timestamp)
val lastEntry = buffer(lastIdx)

if (currSlotId == lastEntry.slotId) {
// We're still in the same timeslot, so just append stats.
val newEntry = lastEntry.add(key, stat)
updated(lastIdx, newEntry)
} else if (currSlotId > lastEntry.slotId) {
// Go to the next slot.
val newIdx = succ(lastIdx)
val newEntry = Entry(currSlotId, Map(key -> stat))
updated(newIdx, newEntry)
} else {
// Going backwards in time, just ignore it.
this
}
}

/** Forget all statistics about a given key. */
def remove(key: K): TimeSlotStats[K, V] =
updated(lastIdx, buffer.map(_.remove(key)))

/** Aggregate stats for a key in all slots that are within the duration. */
def get(key: K, timestamp: Timestamp = System.currentTimeMillis): V =
fold(timestamp)(Monoid[V].empty) { case (acc, stats) =>
stats.get(key).map(acc |+| _).getOrElse(acc)
}

/** Aggregate all stats in all slots within the duration. */
def getAll(timestamp: Timestamp = System.currentTimeMillis): Map[K, V] =
fold(timestamp)(Map.empty[K, V]) { case (acc, stats) =>
acc |+| stats
}

private def fold[A](timestamp: Timestamp)(init: A)(f: (A, Map[K, V]) => A) = {
val (start, end) = slotRange(timestamp)

def loop(idx: Int, acc: A): A = {
val entry = buffer(idx)
if (entry.slotId < start || end < entry.slotId)
acc
else
loop(pred(idx), f(acc, entry.slotStats))
}

loop(lastIdx, init)
}

/** Truncate the current timestamp based on the slot duration. */
private def slotId(timestamp: Timestamp): Timestamp = {
timestamp - timestamp % slotDuration.toMillis
}

/** The range of time slots based on the current timestamp and the buffer duration. */
private def slotRange(timestamp: Timestamp): (Timestamp, Timestamp) = {
val end = slotId(timestamp)
val start = slotId(timestamp - duration.toMillis)
start -> end
}

private def succ(idx: Int): Int = (idx + 1) % slotCount
private def pred(idx: Int): Int = (idx - 1) % slotCount

private def updated(
lastIdx: Int,
entry: Entry[K, V]
): TimeSlotStats[K, V] =
updated(lastIdx, buffer.updated(lastIdx, entry))

private def updated(
lastIdx: Int,
buffer: IndexedSeq[Entry[K, V]]
): TimeSlotStats[K, V] =
new TimeSlotStats[K, V](slotDuration, lastIdx, buffer)
}

object TimeSlotStats {

// Milliseconds since epoch.
type Timestamp = Long

case class Entry[K, V: Monoid](
slotId: Timestamp,
slotStats: Map[K, V]
) {
def add(key: K, stat: V): Entry[K, V] =
copy(slotStats = slotStats |+| Map(key -> stat))

def remove(key: K): Entry[K, V] =
copy(slotStats = slotStats - key)
}

def apply[K, V: Monoid](
slotDuration: FiniteDuration,
slotCount: Int
): Option[TimeSlotStats[K, V]] =
if (slotDuration == Duration.Zero || slotCount <= 0) None
else
Some {
new TimeSlotStats[K, V](
slotDuration,
lastIdx = slotCount - 1, // So the first slot we fill will move to 0.
buffer = IndexedSeq.fill(slotCount)(Entry(0L, Map.empty[K, V]))
)
}
}
2 changes: 2 additions & 0 deletions src/test/scala/io/iohk/ethereum/network/PeerManagerSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ class PeerManagerSpec

time.advance(21000) // connect to 2 bootstrap peers

peerEventBus.expectMsg(Subscribe(PeerManagerActor.MessageSubscriptionClassifier))
peerEventBus.expectMsg(Publish(PeerDisconnected(PeerId(probe.ref.path.name))))
}

Expand Down Expand Up @@ -158,6 +159,7 @@ class PeerManagerSpec
// Peer(3) after receiving disconnect schedules poison pill for himself
probe3.ref ! PoisonPill

peerEventBus.expectMsg(Subscribe(PeerManagerActor.MessageSubscriptionClassifier))
peerEventBus.expectMsg(Publish(PeerDisconnected(PeerId(probe3.ref.path.name))))
}

Expand Down
116 changes: 116 additions & 0 deletions src/test/scala/io/iohk/ethereum/network/TimeSlotStatsSpec.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package io.iohk.ethereum.network

import org.scalatest.flatspec.AnyFlatSpec
import scala.concurrent.duration._
import org.scalatest.matchers.should.Matchers
import org.scalatest.Inspectors

class TimeSlotStatsSpec extends AnyFlatSpec with Matchers {
behavior of "TimeSlotStats"

val emptyStats = TimeSlotStats[String, Int](slotDuration = 1.minute, slotCount = 30).get

it should "add new keys to the last timeslot" in {
val stats = emptyStats.add("foo", 1)
stats.buffer(0).slotStats("foo") shouldBe 1
}

it should "merge keys in the last timeslot" in {
val stats = emptyStats.add("foo", 1).add("foo", 2).add("bar", 0, timestamp = System.currentTimeMillis + 10)
stats.buffer(0).slotStats("foo") shouldBe 3
stats.buffer(0).slotStats("bar") shouldBe 0
}

it should "ignore updates for earlier timeslots" in {
val stats0 = emptyStats.add("foo", 1)
val stats1 = stats0.add("foo", 2, timestamp = System.currentTimeMillis - emptyStats.slotDuration.toMillis - 1)

stats0 shouldBe stats1
}

it should "add new slots for the next timeslot" in {
val stats = emptyStats
.add("foo", 1)
.add("foo", 2, timestamp = System.currentTimeMillis + emptyStats.slotDuration.toMillis + 1)

stats.buffer(0).slotStats("foo") shouldBe 1
stats.buffer(1).slotStats("foo") shouldBe 2
}

it should "remove keys from all slots" in {
val stats = emptyStats
.add("foo", 1)
.add("bar", 2)
.add("foo", 3, timestamp = System.currentTimeMillis + emptyStats.slotDuration.toMillis + 1)
.add("bar", 4, timestamp = System.currentTimeMillis + emptyStats.slotDuration.toMillis + 2)
.remove("foo")

Inspectors.forAll(stats.buffer) { entry =>
entry.slotStats should not contain key("foo")
}
Inspectors.forExactly(2, stats.buffer) { entry =>
entry.slotStats should contain key ("bar")
}
}

it should "turn around and overwrite the first slot after all of them have been written to" in {
val stats = Range
.inclusive(0, emptyStats.slotCount)
.map(i => i -> (System.currentTimeMillis + emptyStats.slotDuration.toMillis * i))
.foldLeft(emptyStats) { case (stats, (i, timestamp)) =>
stats.add("foo", i, timestamp)
}

stats.buffer(0).slotStats("foo") shouldBe emptyStats.slotCount
stats.buffer(1).slotStats("foo") shouldBe 1
}

it should "aggregate the stats of a given key" in new AggregateFixture {
stats.get("foo") shouldBe 0
stats.get("foo", ts1) shouldBe 0
stats.get("bar", ts1) shouldBe 0
stats.get("foo", ts2) shouldBe 3
stats.get("bar", ts2) shouldBe 6
stats.get("BAR", ts2) shouldBe 0
}

it should "aggregate all stats" in new AggregateFixture {
stats.getAll() shouldBe empty
stats.getAll(ts1) shouldBe empty
stats.getAll(ts2) shouldBe Map("foo" -> 3, "bar" -> 6)
}

it should "aggregate stats in the past within the window" in new AggregateFixture {
stats.getAll(ts2 + slotMillis * 2) should not be empty
}

it should "not aggregate beyond the window" in new AggregateFixture {
stats.getAll(ts2 + slotMillis * (stats.slotCount + 1)) shouldBe empty
}

trait AggregateFixture {
val slotMillis = emptyStats.slotDuration.toMillis
val ts0 = System.currentTimeMillis
val ts1 = ts0 + slotMillis * emptyStats.slotCount // puts t0 out of scope
val ts2 = ts1 + slotMillis

val stats = emptyStats
.add("foo", 1, ts0)
.add("bar", 2, ts1)
.add("foo", 3, ts2)
.add("bar", 4, ts2)

stats.lastIdx shouldBe 2
}

it should "handle 0 in configuration" in {
// This might happen if we base the values on something which can be 0.
val zeros = List(
TimeSlotStats[String, Int](slotDuration = 1.minutes, slotCount = 0),
TimeSlotStats[String, Int](slotDuration = 0.minutes, slotCount = 1)
)
Inspectors.forAll(zeros) {
_ shouldBe empty
}
}
}