Skip to content

Commit

Permalink
ETCM-466: Prune incoming peers beyond a certain age if hit the maximu…
Browse files Browse the repository at this point in the history
…m limit.
  • Loading branch information
aakoshh committed Dec 4, 2020
1 parent 9ad0621 commit cf34f4a
Show file tree
Hide file tree
Showing 9 changed files with 80 additions and 6 deletions.
2 changes: 2 additions & 0 deletions src/it/scala/io/iohk/ethereum/sync/util/CommonFakePeer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,8 @@ abstract class CommonFakePeer(peerName: String, fakePeerCustomConfig: FakePeerCu
override val maxOutgoingPeers = 10
override val maxIncomingPeers = 5
override val maxPendingPeers = 5
override val pruneIncomingPeers = 0
override val minPruneAge = 1.minute
override val networkId: Int = 1

override val updateNodesInitialDelay: FiniteDuration = 5.seconds
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ object DumpChainApp extends App with NodeKeyBuilder with SecureRandomBuilder wit
override val maxOutgoingPeers: Int = Config.Network.peer.maxOutgoingPeers
override val maxIncomingPeers: Int = Config.Network.peer.maxIncomingPeers
override val maxPendingPeers: Int = Config.Network.peer.maxPendingPeers
override val pruneIncomingPeers: Int = Config.Network.peer.pruneIncomingPeers
override val minPruneAge: FiniteDuration = Config.Network.peer.minPruneAge
override val networkId: Int = privateNetworkId
override val updateNodesInitialDelay: FiniteDuration = 5.seconds
override val updateNodesInterval: FiniteDuration = 20.seconds
Expand Down
8 changes: 7 additions & 1 deletion src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,13 @@ mantis {
max-outgoing-peers = 50

# Maximum number of peers that can connect to this node
max-incoming-peers = 30
max-incoming-peers = 50

# Number of incoming peers to prune if we hit the maximum, to free up slots for new connections.
prune-incoming-peers = 10

# Minimum age of peers before they can be selected for pruning.
min-prune-age = 30.minutes

# Maximum number of peers that can be connecting to this node
max-pending-peers = 20
Expand Down
43 changes: 40 additions & 3 deletions src/main/scala/io/iohk/ethereum/network/ConnectedPeers.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,15 @@ import java.net.InetSocketAddress

import akka.actor.ActorRef
import akka.util.ByteString
import scala.concurrent.duration.FiniteDuration
import scala.util.Random

case class ConnectedPeers(
private val incomingPendingPeers: Map[PeerId, Peer],
private val outgoingPendingPeers: Map[PeerId, Peer],
private val handshakedPeers: Map[PeerId, Peer]
private val handshakedPeers: Map[PeerId, Peer],
private val pruningPeers: Map[PeerId, Peer],
private val lastPruneTimestamp: Long
) {

// FIXME: Kept only for compatibility purposes, should eventually be removed
Expand Down Expand Up @@ -38,6 +42,8 @@ case class ConnectedPeers(
lazy val outgoingHandshakedPeersCount: Int = handshakedPeers.count { case (_, p) => !p.incomingConnection }
lazy val handshakedPeersCount: Int = handshakedPeers.size

lazy val incomingPruningPeersCount: Int = pruningPeers.count { case (_, p) => p.incomingConnection }

/** Sum of handshaked and pending peers. */
lazy val outgoingPeersCount: Int = peers.count { case (_, p) => !p.incomingConnection }

Expand Down Expand Up @@ -68,11 +74,42 @@ case class ConnectedPeers(

(
peersId,
ConnectedPeers(incomingPendingPeers -- peersId, outgoingPendingPeers -- peersId, handshakedPeers -- peersId)
ConnectedPeers(
incomingPendingPeers -- peersId,
outgoingPendingPeers -- peersId,
handshakedPeers -- peersId,
pruningPeers -- peersId,
lastPruneTimestamp = lastPruneTimestamp
)
)
}

def prunePeers(incoming: Boolean, minAge: FiniteDuration, numPeers: Int): (Seq[Peer], ConnectedPeers) = {
val now = System.currentTimeMillis
val ageThreshold = now - minAge.toMillis
if (lastPruneTimestamp > ageThreshold) {
// Protect against hostile takeovers by limiting the frequency of pruning.
(Seq.empty, this)
} else {
val candidates = handshakedPeers.collect {
case (_, p)
if (p.incomingConnection || !incoming)
&& p.createTimeMillis <= ageThreshold
&& !pruningPeers.contains(p.id) =>
p
}.toSeq
val toPrune = Random.shuffle(candidates).take(numPeers)
val pruned = copy(
pruningPeers = toPrune.foldLeft(pruningPeers) { case (acc, peer) =>
acc + (peer.id -> peer)
},
lastPruneTimestamp = now
)
(toPrune, pruned)
}
}
}

object ConnectedPeers {
def empty: ConnectedPeers = ConnectedPeers(Map.empty, Map.empty, Map.empty)
def empty: ConnectedPeers = ConnectedPeers(Map.empty, Map.empty, Map.empty, Map.empty, 0L)
}
3 changes: 2 additions & 1 deletion src/main/scala/io/iohk/ethereum/network/Peer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ case class Peer(
remoteAddress: InetSocketAddress,
ref: ActorRef,
incomingConnection: Boolean,
nodeId: Option[ByteString] = None
nodeId: Option[ByteString] = None,
createTimeMillis: Long = System.currentTimeMillis
) {
// FIXME PeerId should be actual peerId i.e id derived form node public key
def id: PeerId = PeerId.fromRef(ref)
Expand Down
22 changes: 21 additions & 1 deletion src/main/scala/io/iohk/ethereum/network/PeerManagerActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,14 @@ class PeerManagerActor(
handshakedPeer.incomingConnection && connectedPeers.incomingHandshakedPeersCount >= peerConfiguration.maxIncomingPeers
) {
handshakedPeer.ref ! PeerActor.DisconnectPeer(Disconnect.Reasons.TooManyPeers)

// It looks like all incoming slots are taken; try to make some room.
val (peersToPrune, prunedConnectedPeers) = pruneIncomingPeers(connectedPeers)
peersToPrune.foreach { peer =>
peer.ref ! PeerActor.DisconnectPeer(Disconnect.Reasons.TooManyPeers)
}
context become listening(prunedConnectedPeers)

} else if (handshakedPeer.nodeId.exists(connectedPeers.hasHandshakedWith)) {
// FIXME: peers received after handshake should always have their nodeId defined, we could maybe later distinguish
// it into PendingPeer/HandshakedPeer classes
Expand All @@ -299,13 +307,23 @@ class PeerManagerActor(
): (Peer, ConnectedPeers) = {
val ref = peerFactory(context, address, incomingConnection)
context watch ref
val pendingPeer = Peer(address, ref, incomingConnection, None)
val pendingPeer = Peer(address, ref, incomingConnection, None, createTimeMillis = System.currentTimeMillis)

val newConnectedPeers = connectedPeers.addNewPendingPeer(pendingPeer)

(pendingPeer, newConnectedPeers)
}

/** Select some peers to be removed so we can free up connection slots. */
private def pruneIncomingPeers(connectedPeers: ConnectedPeers): (Seq[Peer], ConnectedPeers) = {
val minIncomingPeers = peerConfiguration.maxIncomingPeers - peerConfiguration.pruneIncomingPeers
val pruneCount = math.max(
connectedPeers.incomingHandshakedPeersCount - connectedPeers.incomingPruningPeersCount - minIncomingPeers,
0
)
connectedPeers.prunePeers(incoming = true, minAge = peerConfiguration.minPruneAge, numPeers = pruneCount)
}

private def getPeers(peers: Set[Peer]): Future[Peers] = {
implicit val timeout: Timeout = Timeout(2.seconds)

Expand Down Expand Up @@ -418,6 +436,8 @@ object PeerManagerActor {
val maxOutgoingPeers: Int
val maxIncomingPeers: Int
val maxPendingPeers: Int
val pruneIncomingPeers: Int
val minPruneAge: FiniteDuration
val networkId: Int
val updateNodesInitialDelay: FiniteDuration
val updateNodesInterval: FiniteDuration
Expand Down
2 changes: 2 additions & 0 deletions src/main/scala/io/iohk/ethereum/utils/Config.scala
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ object Config {
val maxOutgoingPeers: Int = peerConfig.getInt("max-outgoing-peers")
val maxIncomingPeers: Int = peerConfig.getInt("max-incoming-peers")
val maxPendingPeers: Int = peerConfig.getInt("max-pending-peers")
val pruneIncomingPeers: Int = peerConfig.getInt("prune-incoming-peers")
val minPruneAge: FiniteDuration = peerConfig.getDuration("min-prune-age").toMillis.millis
val networkId: Int = blockchainConfig.networkId

val rlpxConfiguration = new RLPxConfiguration {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,8 @@ class BlockchainHostActorSpec extends AnyFlatSpec with Matchers {
override val maxOutgoingPeers = 10
override val maxIncomingPeers = 5
override val maxPendingPeers = 5
override val pruneIncomingPeers = 0
override val minPruneAge = 1.minute
override val networkId: Int = 1

override val updateNodesInitialDelay: FiniteDuration = 5.seconds
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,8 @@ class PeerActorSpec
override val maxOutgoingPeers = 10
override val maxIncomingPeers = 5
override val maxPendingPeers = 5
override val pruneIncomingPeers = 0
override val minPruneAge = 1.minute
override val networkId: Int = 1

override val updateNodesInitialDelay: FiniteDuration = 5.seconds
Expand Down

0 comments on commit cf34f4a

Please sign in to comment.