Skip to content

Commit

Permalink
Merge pull request #833 from input-output-hk/ETCM-446-connection-limi…
Browse files Browse the repository at this point in the history
…t-ranges

ETCM-446: Connection limit ranges
  • Loading branch information
aakoshh authored Dec 15, 2020
2 parents a5092d0 + 87aa286 commit b368808
Show file tree
Hide file tree
Showing 11 changed files with 458 additions and 22 deletions.
3 changes: 3 additions & 0 deletions src/it/scala/io/iohk/ethereum/sync/util/CommonFakePeer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,12 @@ abstract class CommonFakePeer(peerName: String, fakePeerCustomConfig: FakePeerCu
override val connectMaxRetries: Int = 3
override val connectRetryDelay: FiniteDuration = 1 second
override val disconnectPoisonPillTimeout: FiniteDuration = 3 seconds
override val minOutgoingPeers = 5
override val maxOutgoingPeers = 10
override val maxIncomingPeers = 5
override val maxPendingPeers = 5
override val pruneIncomingPeers = 0
override val minPruneAge = 1.minute
override val networkId: Int = 1

override val updateNodesInitialDelay: FiniteDuration = 5.seconds
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,12 @@ object DumpChainApp extends App with NodeKeyBuilder with SecureRandomBuilder wit
override val waitForChainCheckTimeout: FiniteDuration = Config.Network.peer.waitForChainCheckTimeout
override val fastSyncHostConfiguration: PeerManagerActor.FastSyncHostConfiguration =
Config.Network.peer.fastSyncHostConfiguration
override val minOutgoingPeers: Int = Config.Network.peer.minOutgoingPeers
override val maxOutgoingPeers: Int = Config.Network.peer.maxOutgoingPeers
override val maxIncomingPeers: Int = Config.Network.peer.maxIncomingPeers
override val maxPendingPeers: Int = Config.Network.peer.maxPendingPeers
override val pruneIncomingPeers: Int = Config.Network.peer.pruneIncomingPeers
override val minPruneAge: FiniteDuration = Config.Network.peer.minPruneAge
override val networkId: Int = privateNetworkId
override val updateNodesInitialDelay: FiniteDuration = 5.seconds
override val updateNodesInterval: FiniteDuration = 20.seconds
Expand Down
22 changes: 17 additions & 5 deletions src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -139,11 +139,22 @@ mantis {
# Maximum MPT components in a single response message (as a blockchain host)
max-mpt-components-per-message = 200

# Maximum number of peers this node can connect to
# Minimum number of peers this node tries to connect to at all times
min-outgoing-peers = 20

# Maximum number of peers this node can connect to at any time.
# It's a bit higher than max-incoming-peers so that the node can quickly churn through incompatible peers after startup.
max-outgoing-peers = 50

# Maximum number of peers that can connect to this node
max-incoming-peers = 50
# Maximum number of peers that can connect to this node.
# Should be at least as much as `min-outgoing-peers` so on a network level `total(max-in) >= total(min-out)`
max-incoming-peers = 30

# Number of incoming peers to prune if we hit the maximum, to free up slots for new connections.
prune-incoming-peers = 10

# Minimum age of peers before they can be selected for pruning, and the minimum time to pass between pruning attempts.
min-prune-age = 30.minutes

# Maximum number of peers that can be connecting to this node
max-pending-peers = 20
Expand All @@ -163,9 +174,10 @@ mantis {

# Resolution of moving window of peer statistics.
# Will be multiplied by `stat-slot-count` to give the overall length of peer statistics availability.
stat-slot-duration = 1.minute
stat-slot-duration = 10.minutes

# How many slots of peer statistics to keep in the moving window, e.g. 60 * 1.minute to keep stats for the last hour with 1 minute resolution.
stat-slot-count = 60
stat-slot-count = 72
}

rpc {
Expand Down
59 changes: 53 additions & 6 deletions src/main/scala/io/iohk/ethereum/network/ConnectedPeers.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@ import java.net.InetSocketAddress

import akka.actor.ActorRef
import akka.util.ByteString
import scala.concurrent.duration.FiniteDuration

case class ConnectedPeers(
private val incomingPendingPeers: Map[PeerId, Peer],
private val outgoingPendingPeers: Map[PeerId, Peer],
private val handshakedPeers: Map[PeerId, Peer]
private val handshakedPeers: Map[PeerId, Peer],
private val pruningPeers: Map[PeerId, Peer],
private val lastPruneTimestamp: Long
) {

// FIXME: Kept only for compatibility purposes, should eventually be removed
Expand All @@ -32,11 +35,16 @@ case class ConnectedPeers(

lazy val incomingPendingPeersCount: Int = incomingPendingPeers.size
lazy val outgoingPendingPeersCount: Int = outgoingPendingPeers.size
lazy val incomingHandshakedPeersCount: Int = handshakedPeers.count { case (_, p) => p.incomingConnection }
lazy val outgoingPeersCount: Int = peers.count { case (_, p) => !p.incomingConnection }
lazy val pendingPeersCount: Int = incomingPendingPeersCount + outgoingPendingPeersCount

lazy val incomingHandshakedPeersCount: Int = handshakedPeers.count { case (_, p) => p.incomingConnection }
lazy val outgoingHandshakedPeersCount: Int = handshakedPeers.count { case (_, p) => !p.incomingConnection }
lazy val handshakedPeersCount: Int = handshakedPeers.size
lazy val pendingPeersCount: Int = incomingPendingPeersCount + outgoingPendingPeersCount

lazy val incomingPruningPeersCount: Int = pruningPeers.count { case (_, p) => p.incomingConnection }

/** Sum of handshaked and pending peers. */
lazy val outgoingPeersCount: Int = peers.count { case (_, p) => !p.incomingConnection }

def getPeer(peerId: PeerId): Option[Peer] = peers.get(peerId)

Expand Down Expand Up @@ -65,11 +73,50 @@ case class ConnectedPeers(

(
peersId,
ConnectedPeers(incomingPendingPeers -- peersId, outgoingPendingPeers -- peersId, handshakedPeers -- peersId)
ConnectedPeers(
incomingPendingPeers -- peersId,
outgoingPendingPeers -- peersId,
handshakedPeers -- peersId,
pruningPeers -- peersId,
lastPruneTimestamp = lastPruneTimestamp
)
)
}

def prunePeers(
minAge: FiniteDuration,
numPeers: Int,
priority: PeerId => Double = _ => 0.0,
incoming: Boolean = true,
currentTimeMillis: Long = System.currentTimeMillis
): (Seq[Peer], ConnectedPeers) = {
val ageThreshold = currentTimeMillis - minAge.toMillis
if (lastPruneTimestamp > ageThreshold || numPeers == 0) {
// Protect against hostile takeovers by limiting the frequency of pruning.
(Seq.empty, this)
} else {
val candidates = handshakedPeers.values.filter(canPrune(incoming, ageThreshold)).toSeq

val toPrune = candidates.sortBy(peer => priority(peer.id)).take(numPeers)

val pruned = copy(
pruningPeers = toPrune.foldLeft(pruningPeers) { case (acc, peer) =>
acc + (peer.id -> peer)
},
lastPruneTimestamp = if (toPrune.nonEmpty) currentTimeMillis else lastPruneTimestamp
)

(toPrune, pruned)
}
}

private def canPrune(incoming: Boolean, minCreateTimeMillis: Long)(peer: Peer): Boolean = {
peer.incomingConnection == incoming &&
peer.createTimeMillis <= minCreateTimeMillis &&
!pruningPeers.contains(peer.id)
}
}

object ConnectedPeers {
def empty: ConnectedPeers = ConnectedPeers(Map.empty, Map.empty, Map.empty)
def empty: ConnectedPeers = ConnectedPeers(Map.empty, Map.empty, Map.empty, Map.empty, 0L)
}
3 changes: 2 additions & 1 deletion src/main/scala/io/iohk/ethereum/network/Peer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ case class Peer(
remoteAddress: InetSocketAddress,
ref: ActorRef,
incomingConnection: Boolean,
nodeId: Option[ByteString] = None
nodeId: Option[ByteString] = None,
createTimeMillis: Long = System.currentTimeMillis
) {
// FIXME PeerId should be actual peerId i.e id derived form node public key
def id: PeerId = PeerId.fromRef(ref)
Expand Down
117 changes: 110 additions & 7 deletions src/main/scala/io/iohk/ethereum/network/PeerManagerActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,10 @@ class PeerManagerActor(
import akka.pattern.{ask, pipe}

implicit class ConnectedPeersOps(connectedPeers: ConnectedPeers) {

/** Number of new connections the node should try to open at any given time. */
def outgoingConnectionDemand: Int =
peerConfiguration.maxOutgoingPeers - connectedPeers.outgoingPeersCount
PeerManagerActor.outgoingConnectionDemand(connectedPeers, peerConfiguration)

def canConnectTo(node: Node): Boolean = {
val socketAddress = node.tcpSocketAddress
Expand Down Expand Up @@ -100,7 +102,8 @@ class PeerManagerActor(
handleCommonMessages(connectedPeers) orElse
handleBlacklistMessages orElse
handleConnections(connectedPeers) orElse
handleNewNodesToConnectMessages(connectedPeers)
handleNewNodesToConnectMessages(connectedPeers) orElse
handlePruning(connectedPeers)
}

private def handleNewNodesToConnectMessages(connectedPeers: ConnectedPeers): Receive = {
Expand Down Expand Up @@ -266,6 +269,12 @@ class PeerManagerActor(
handshakedPeer.incomingConnection && connectedPeers.incomingHandshakedPeersCount >= peerConfiguration.maxIncomingPeers
) {
handshakedPeer.ref ! PeerActor.DisconnectPeer(Disconnect.Reasons.TooManyPeers)

// It looks like all incoming slots are taken; try to make some room.
self ! SchedulePruneIncomingPeers

context become listening(connectedPeers)

} else if (handshakedPeer.nodeId.exists(connectedPeers.hasHandshakedWith)) {
// FIXME: peers received after handshake should always have their nodeId defined, we could maybe later distinguish
// it into PendingPeer/HandshakedPeer classes
Expand All @@ -286,13 +295,54 @@ class PeerManagerActor(
): (Peer, ConnectedPeers) = {
val ref = peerFactory(context, address, incomingConnection)
context watch ref
val pendingPeer = Peer(address, ref, incomingConnection, None)
val pendingPeer = Peer(address, ref, incomingConnection, None, createTimeMillis = System.currentTimeMillis)

val newConnectedPeers = connectedPeers.addNewPendingPeer(pendingPeer)

(pendingPeer, newConnectedPeers)
}

private def handlePruning(connectedPeers: ConnectedPeers): Receive = {
case SchedulePruneIncomingPeers =>
implicit val timeout: Timeout = Timeout(peerConfiguration.updateNodesInterval)

// Ask for the whole statistics duration, we'll use averages to make it fair.
val window = peerConfiguration.statSlotCount * peerConfiguration.statSlotDuration

(peerStatistics ? PeerStatisticsActor.GetStatsForAll(window))
.mapTo[PeerStatisticsActor.StatsForAll]
.map(PruneIncomingPeers(_))
.pipeTo(self)

case PruneIncomingPeers(PeerStatisticsActor.StatsForAll(stats)) =>
val prunedConnectedPeers = pruneIncomingPeers(connectedPeers, stats)

context become listening(prunedConnectedPeers)
}

/** Disconnect some incoming connections so we can free up slots. */
private def pruneIncomingPeers(
connectedPeers: ConnectedPeers,
stats: Map[PeerId, PeerStat]
): ConnectedPeers = {
val pruneCount = PeerManagerActor.numberOfIncomingConnectionsToPrune(connectedPeers, peerConfiguration)
val now = System.currentTimeMillis
val (peersToPrune, prunedConnectedPeers) =
connectedPeers.prunePeers(
incoming = true,
minAge = peerConfiguration.minPruneAge,
numPeers = pruneCount,
priority = prunePriority(stats, now),
currentTimeMillis = now
)

peersToPrune.foreach { peer =>
peer.ref ! PeerActor.DisconnectPeer(Disconnect.Reasons.TooManyPeers)
}

prunedConnectedPeers
}

private def getPeers(peers: Set[Peer]): Future[Peers] = {
implicit val timeout: Timeout = Timeout(2.seconds)

Expand Down Expand Up @@ -394,7 +444,7 @@ object PeerManagerActor {
ctx.actorOf(props, id)
}

trait PeerConfiguration {
trait PeerConfiguration extends PeerConfiguration.ConnectionLimits {
val connectRetryDelay: FiniteDuration
val connectMaxRetries: Int
val disconnectPoisonPillTimeout: FiniteDuration
Expand All @@ -403,9 +453,6 @@ object PeerManagerActor {
val waitForChainCheckTimeout: FiniteDuration
val fastSyncHostConfiguration: FastSyncHostConfiguration
val rlpxConfiguration: RLPxConfiguration
val maxOutgoingPeers: Int
val maxIncomingPeers: Int
val maxPendingPeers: Int
val networkId: Int
val updateNodesInitialDelay: FiniteDuration
val updateNodesInterval: FiniteDuration
Expand All @@ -414,6 +461,16 @@ object PeerManagerActor {
val statSlotDuration: FiniteDuration
val statSlotCount: Int
}
object PeerConfiguration {
trait ConnectionLimits {
val minOutgoingPeers: Int
val maxOutgoingPeers: Int
val maxIncomingPeers: Int
val maxPendingPeers: Int
val pruneIncomingPeers: Int
val minPruneAge: FiniteDuration
}
}

trait FastSyncHostConfiguration {
val maxBlocksHeadersPerMessage: Int
Expand Down Expand Up @@ -447,4 +504,50 @@ object PeerManagerActor {
case class OutgoingConnectionAlreadyHandled(uri: URI) extends ConnectionError

case class PeerAddress(value: String) extends BlackListId

case object SchedulePruneIncomingPeers
case class PruneIncomingPeers(stats: PeerStatisticsActor.StatsForAll)

/** Number of new connections the node should try to open at any given time. */
def outgoingConnectionDemand(
connectedPeers: ConnectedPeers,
peerConfiguration: PeerConfiguration.ConnectionLimits
): Int = {
if (connectedPeers.outgoingHandshakedPeersCount >= peerConfiguration.minOutgoingPeers)
// We have established at least the minimum number of working connections.
0
else
// Try to connect to more, up to the maximum, including pending peers.
peerConfiguration.maxOutgoingPeers - connectedPeers.outgoingPeersCount
}

def numberOfIncomingConnectionsToPrune(
connectedPeers: ConnectedPeers,
peerConfiguration: PeerConfiguration.ConnectionLimits
): Int = {
val minIncomingPeers = peerConfiguration.maxIncomingPeers - peerConfiguration.pruneIncomingPeers
math.max(
0,
connectedPeers.incomingHandshakedPeersCount - connectedPeers.incomingPruningPeersCount - minIncomingPeers
)
}

/** Assign a priority to peers that we can use to order connections,
* with lower priorities being the ones to prune first.
*/
def prunePriority(stats: Map[PeerId, PeerStat], currentTimeMillis: Long)(peerId: PeerId): Double = {
stats
.get(peerId)
.flatMap { stat =>
val maybeAgeSeconds = stat.firstSeenTimeMillis
.map(currentTimeMillis - _)
.map(_ * 1000)
.filter(_ > 0)

// Use the average number of responses per second over the lifetime of the connection
// as an indicator of how fruitful the peer is for us.
maybeAgeSeconds.map(age => stat.responsesReceived.toDouble / age)
}
.getOrElse(0.0)
}
}
3 changes: 3 additions & 0 deletions src/main/scala/io/iohk/ethereum/utils/Config.scala
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,12 @@ object Config {
val waitForStatusTimeout: FiniteDuration = peerConfig.getDuration("wait-for-status-timeout").toMillis.millis
val waitForChainCheckTimeout: FiniteDuration =
peerConfig.getDuration("wait-for-chain-check-timeout").toMillis.millis
val minOutgoingPeers: Int = peerConfig.getInt("min-outgoing-peers")
val maxOutgoingPeers: Int = peerConfig.getInt("max-outgoing-peers")
val maxIncomingPeers: Int = peerConfig.getInt("max-incoming-peers")
val maxPendingPeers: Int = peerConfig.getInt("max-pending-peers")
val pruneIncomingPeers: Int = peerConfig.getInt("prune-incoming-peers")
val minPruneAge: FiniteDuration = peerConfig.getDuration("min-prune-age").toMillis.millis
val networkId: Int = blockchainConfig.networkId

val rlpxConfiguration = new RLPxConfiguration {
Expand Down
3 changes: 3 additions & 0 deletions src/test/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@ mantis {
peer {
connect-retry-delay = 1 second
disconnect-poison-pill-timeout = 1 second
min-outgoing-peers = 3
max-outgoing-peers = 3
max-incoming-peers = 1
max-pending-peers = 1
prune-incoming-peers = 1
min-prune-age = 0.seconds
update-nodes-initial-delay = 5.seconds
update-nodes-interval = 10.seconds
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,9 +280,12 @@ class BlockchainHostActorSpec extends AnyFlatSpec with Matchers {
override val connectMaxRetries: Int = 3
override val connectRetryDelay: FiniteDuration = 1 second
override val disconnectPoisonPillTimeout: FiniteDuration = 5 seconds
override val minOutgoingPeers = 5
override val maxOutgoingPeers = 10
override val maxIncomingPeers = 5
override val maxPendingPeers = 5
override val pruneIncomingPeers = 0
override val minPruneAge = 1.minute
override val networkId: Int = 1

override val updateNodesInitialDelay: FiniteDuration = 5.seconds
Expand Down
Loading

0 comments on commit b368808

Please sign in to comment.