diff --git a/project/Dependencies.scala b/project/Dependencies.scala
index 9766cb876d..44cceaac8c 100644
--- a/project/Dependencies.scala
+++ b/project/Dependencies.scala
@@ -71,7 +71,7 @@ object Dependencies {
)
val network: Seq[ModuleID] = {
- val scalanetVersion = "0.4-SNAPSHOT"
+ val scalanetVersion = "0.4.2-SNAPSHOT"
Seq(
"io.iohk" %% "scalanet" % scalanetVersion,
"io.iohk" %% "scalanet-discovery" % scalanetVersion
diff --git a/repo.nix b/repo.nix
index a7a364d535..f39bc6ea21 100644
--- a/repo.nix
+++ b/repo.nix
@@ -9,37 +9,37 @@
"nix-public" = "";
};
"artifacts" = {
- "nix-Sonatype OSS Snapshots/io/iohk/scalanet-discovery_2.12/0.4-SNAPSHOT/scalanet-discovery_2.12-0.4-SNAPSHOT-javadoc.jar" = {
- url = "https://oss.sonatype.org/content/repositories/snapshots/io/iohk/scalanet-discovery_2.12/0.4-SNAPSHOT/scalanet-discovery_2.12-0.4-SNAPSHOT-javadoc.jar";
- sha256 = "E1BFC529ACE03B29B2D7AB72F8587C55EDEEEE211EC795E328E2F738D67ECE57";
+ "nix-Sonatype OSS Snapshots/io/iohk/scalanet-discovery_2.12/0.4.2-SNAPSHOT/scalanet-discovery_2.12-0.4.2-SNAPSHOT-javadoc.jar" = {
+ url = "https://oss.sonatype.org/content/repositories/snapshots/io/iohk/scalanet-discovery_2.12/0.4.2-SNAPSHOT/scalanet-discovery_2.12-0.4.2-SNAPSHOT-javadoc.jar";
+ sha256 = "190A1AB2C6EBEBDAE6E8729005018C6524C79E87447F09F0EC26DA7005D27AE2";
};
- "nix-Sonatype OSS Snapshots/io/iohk/scalanet-discovery_2.12/0.4-SNAPSHOT/scalanet-discovery_2.12-0.4-SNAPSHOT-sources.jar" = {
- url = "https://oss.sonatype.org/content/repositories/snapshots/io/iohk/scalanet-discovery_2.12/0.4-SNAPSHOT/scalanet-discovery_2.12-0.4-SNAPSHOT-sources.jar";
- sha256 = "A86A69FB1E4973AFE1FC83FA6F49C6E71221083F437CB75333461024EBC39962";
+ "nix-Sonatype OSS Snapshots/io/iohk/scalanet-discovery_2.12/0.4.2-SNAPSHOT/scalanet-discovery_2.12-0.4.2-SNAPSHOT-sources.jar" = {
+ url = "https://oss.sonatype.org/content/repositories/snapshots/io/iohk/scalanet-discovery_2.12/0.4.2-SNAPSHOT/scalanet-discovery_2.12-0.4.2-SNAPSHOT-sources.jar";
+ sha256 = "BE91870FA3F3F1B4D37344563B33CB06E6451788CACF2CB7BAA77C0108D8E2E5";
};
- "nix-Sonatype OSS Snapshots/io/iohk/scalanet-discovery_2.12/0.4-SNAPSHOT/scalanet-discovery_2.12-0.4-SNAPSHOT.jar" = {
- url = "https://oss.sonatype.org/content/repositories/snapshots/io/iohk/scalanet-discovery_2.12/0.4-SNAPSHOT/scalanet-discovery_2.12-0.4-SNAPSHOT.jar";
- sha256 = "72DB578799E07B3D10612C8A61BFA3D21EAC31312E0BAFECCD203D0D637D7498";
+ "nix-Sonatype OSS Snapshots/io/iohk/scalanet-discovery_2.12/0.4.2-SNAPSHOT/scalanet-discovery_2.12-0.4.2-SNAPSHOT.jar" = {
+ url = "https://oss.sonatype.org/content/repositories/snapshots/io/iohk/scalanet-discovery_2.12/0.4.2-SNAPSHOT/scalanet-discovery_2.12-0.4.2-SNAPSHOT.jar";
+ sha256 = "2FCBBF064CC95DA4328FB259F4424C1B745826A308249169EE4343C0C962CE94";
};
- "nix-Sonatype OSS Snapshots/io/iohk/scalanet-discovery_2.12/0.4-SNAPSHOT/scalanet-discovery_2.12-0.4-SNAPSHOT.pom" = {
- url = "https://oss.sonatype.org/content/repositories/snapshots/io/iohk/scalanet-discovery_2.12/0.4-SNAPSHOT/scalanet-discovery_2.12-0.4-SNAPSHOT.pom";
- sha256 = "30830E277F9651F63FA521F111BFF95E00DD3B96FEF481E8B334F6539521B738";
+ "nix-Sonatype OSS Snapshots/io/iohk/scalanet-discovery_2.12/0.4.2-SNAPSHOT/scalanet-discovery_2.12-0.4.2-SNAPSHOT.pom" = {
+ url = "https://oss.sonatype.org/content/repositories/snapshots/io/iohk/scalanet-discovery_2.12/0.4.2-SNAPSHOT/scalanet-discovery_2.12-0.4.2-SNAPSHOT.pom";
+ sha256 = "46B60B737421B7E5E4CEB6412DCAAD80221EDC3ACD7E2227CDA01F1A9F57E18E";
};
- "nix-Sonatype OSS Snapshots/io/iohk/scalanet_2.12/0.4-SNAPSHOT/scalanet_2.12-0.4-SNAPSHOT-javadoc.jar" = {
- url = "https://oss.sonatype.org/content/repositories/snapshots/io/iohk/scalanet_2.12/0.4-SNAPSHOT/scalanet_2.12-0.4-SNAPSHOT-javadoc.jar";
- sha256 = "B38F674EEA0E4660D2A39245D9BC129FD9171312C5CCCCE9DB5D1BC140865A86";
+ "nix-Sonatype OSS Snapshots/io/iohk/scalanet_2.12/0.4.2-SNAPSHOT/scalanet_2.12-0.4.2-SNAPSHOT-javadoc.jar" = {
+ url = "https://oss.sonatype.org/content/repositories/snapshots/io/iohk/scalanet_2.12/0.4.2-SNAPSHOT/scalanet_2.12-0.4.2-SNAPSHOT-javadoc.jar";
+ sha256 = "84D56180DC60D6F4F6911D4507622E91DE4E2C96582F97784135DBFDECB0B12B";
};
- "nix-Sonatype OSS Snapshots/io/iohk/scalanet_2.12/0.4-SNAPSHOT/scalanet_2.12-0.4-SNAPSHOT-sources.jar" = {
- url = "https://oss.sonatype.org/content/repositories/snapshots/io/iohk/scalanet_2.12/0.4-SNAPSHOT/scalanet_2.12-0.4-SNAPSHOT-sources.jar";
- sha256 = "DBF79681BAC9A923B0EAB597BE5B849C6F444BB11662BB07DC755BBBD97590EB";
+ "nix-Sonatype OSS Snapshots/io/iohk/scalanet_2.12/0.4.2-SNAPSHOT/scalanet_2.12-0.4.2-SNAPSHOT-sources.jar" = {
+ url = "https://oss.sonatype.org/content/repositories/snapshots/io/iohk/scalanet_2.12/0.4.2-SNAPSHOT/scalanet_2.12-0.4.2-SNAPSHOT-sources.jar";
+ sha256 = "07553109F5461D45AC1B048C134132E6CBB32EC838300093E550B8CFE62C434B";
};
- "nix-Sonatype OSS Snapshots/io/iohk/scalanet_2.12/0.4-SNAPSHOT/scalanet_2.12-0.4-SNAPSHOT.jar" = {
- url = "https://oss.sonatype.org/content/repositories/snapshots/io/iohk/scalanet_2.12/0.4-SNAPSHOT/scalanet_2.12-0.4-SNAPSHOT.jar";
- sha256 = "98D7F789C0A80FE7F810B0BD75A0F623E787C0212CEBDB71B2501EA7588C999C";
+ "nix-Sonatype OSS Snapshots/io/iohk/scalanet_2.12/0.4.2-SNAPSHOT/scalanet_2.12-0.4.2-SNAPSHOT.jar" = {
+ url = "https://oss.sonatype.org/content/repositories/snapshots/io/iohk/scalanet_2.12/0.4.2-SNAPSHOT/scalanet_2.12-0.4.2-SNAPSHOT.jar";
+ sha256 = "7DF884B172D973459BB7AF6553014B984FE87EC788DD35AEB27F0DEDDD4B215E";
};
- "nix-Sonatype OSS Snapshots/io/iohk/scalanet_2.12/0.4-SNAPSHOT/scalanet_2.12-0.4-SNAPSHOT.pom" = {
- url = "https://oss.sonatype.org/content/repositories/snapshots/io/iohk/scalanet_2.12/0.4-SNAPSHOT/scalanet_2.12-0.4-SNAPSHOT.pom";
- sha256 = "5EB55ED46D169049D4EB371DB120C6F6EE37AFE89230DFA7CE11298714801481";
+ "nix-Sonatype OSS Snapshots/io/iohk/scalanet_2.12/0.4.2-SNAPSHOT/scalanet_2.12-0.4.2-SNAPSHOT.pom" = {
+ url = "https://oss.sonatype.org/content/repositories/snapshots/io/iohk/scalanet_2.12/0.4.2-SNAPSHOT/scalanet_2.12-0.4.2-SNAPSHOT.pom";
+ sha256 = "A27D2641B97A06FDA1B2C81FA96FA1ECA1E563ACA36CEEF3C8C6070C0F1CF731";
};
"nix-public/ch/megard/akka-http-cors_2.12/1.1.0/akka-http-cors_2.12-1.1.0-javadoc.jar" = {
url = "https://repo1.maven.org/maven2/ch/megard/akka-http-cors_2.12/1.1.0/akka-http-cors_2.12-1.1.0-javadoc.jar";
diff --git a/src/it/scala/io/iohk/ethereum/sync/util/CommonFakePeer.scala b/src/it/scala/io/iohk/ethereum/sync/util/CommonFakePeer.scala
index a3f303b1e8..846554ef00 100644
--- a/src/it/scala/io/iohk/ethereum/sync/util/CommonFakePeer.scala
+++ b/src/it/scala/io/iohk/ethereum/sync/util/CommonFakePeer.scala
@@ -19,7 +19,7 @@ import io.iohk.ethereum.mpt.MerklePatriciaTrie
import io.iohk.ethereum.network.EtcPeerManagerActor.PeerInfo
import io.iohk.ethereum.network.PeerManagerActor.{FastSyncHostConfiguration, PeerConfiguration}
import io.iohk.ethereum.network.discovery.{DiscoveryConfig, Node}
-import io.iohk.ethereum.network.discovery.PeerDiscoveryManager.{DiscoveredNodesInfo, DiscoveryNodeInfo}
+import io.iohk.ethereum.network.discovery.PeerDiscoveryManager.{DiscoveredNodesInfo}
import io.iohk.ethereum.network.handshaker.{EtcHandshaker, EtcHandshakerConfiguration, Handshaker}
import io.iohk.ethereum.network.p2p.EthereumMessageDecoder
import io.iohk.ethereum.network.p2p.messages.CommonMessages.NewBlock
@@ -193,10 +193,7 @@ abstract class CommonFakePeer(peerName: String, fakePeerCustomConfig: FakePeerCu
val listenAddress = randomAddress()
lazy val node =
- DiscoveryNodeInfo(
- Node(ByteString(nodeStatus.nodeId), listenAddress.getAddress, listenAddress.getPort, listenAddress.getPort),
- 1
- )
+ Node(ByteString(nodeStatus.nodeId), listenAddress.getAddress, listenAddress.getPort, listenAddress.getPort)
lazy val vmConfig = VmConfig(Config.config)
@@ -259,14 +256,14 @@ abstract class CommonFakePeer(peerName: String, fakePeerCustomConfig: FakePeerCu
} yield ()
}
- def connectToPeers(nodes: Set[DiscoveryNodeInfo]): Task[Unit] = {
+ def connectToPeers(nodes: Set[Node]): Task[Unit] = {
for {
_ <- Task {
peerManager ! DiscoveredNodesInfo(nodes)
}
_ <- retryUntilWithDelay(Task(storagesInstance.storages.knownNodesStorage.getKnownNodes()), 1.second, 5) {
knownNodes =>
- val requestedNodes = nodes.map(_.node.id)
+ val requestedNodes = nodes.map(_.id)
val currentNodes = knownNodes.map(Node.fromUri).map(_.id)
requestedNodes.subsetOf(currentNodes)
}
diff --git a/src/main/resources/application.conf b/src/main/resources/application.conf
index 2ab2269787..1d9117783b 100644
--- a/src/main/resources/application.conf
+++ b/src/main/resources/application.conf
@@ -53,29 +53,51 @@ mantis {
# Turn discovery of/off
discovery-enabled = true
+ # Externally visible hostname or IP.
+ host = null
+
# Listening interface for discovery protocol
interface = "0.0.0.0"
# Listening port for discovery protocol
port = 30303
- # Maximum discovered nodes stored (TODO: remove me once full protocol is in place)
- nodes-limit = 5000
-
- # Initial delay for discovery scan
- scan-initial-delay = 20.seconds
+ # If true, the node considers the bootstrap and the previously persisted nodes
+ # as already discovered and uses them as peer candidates to get blocks from.
+ # Otherwise it enroll with the bootstrap nodes and gradually discover the
+ # network every time we start, eventually serving candidates.
+ #
+ # Useful if discovery has problem, as the node can start syncing with the
+ # bootstraps straight away.
+ #
+ # Note that setting reuse-known-nodes and discovery-enabled to false at the
+ # same time would mean the node would have no peer candidates at all.
+ reuse-known-nodes = true
# Scan interval for discovery
- scan-interval = 30.seconds
+ scan-interval = 15.minutes
# Discovery message expiration time
- message-expiration = 90.minutes
+ message-expiration = 1.minute
+
+ # Maximum amount a message can be expired by,
+ # accounting for possible discrepancies between nodes' clocks.
+ max-clock-drift = 15.seconds
+
+ # Maximum number of peers in each k-bucket.
+ kademlia-bucket-size = 16
+
+ # Timeout for individual requests like Ping.
+ request-timeout = 3.seconds
+
+ # Timeout to collect all possible responses for a FindNode request.
+ kademlia-timeout = 7.seconds
- # (TODO: remove me once full protocol is in place)
- scan-max-nodes = 20
+ # Level of concurrency during lookups and enrollment.
+ kademlia-alpha = 3
- # (TODO: remove me once full protocol is in place)
- max-sent-neighbours = 10
+ # Maximum number of messages in the queue associated with a UDP channel.
+ channel-capacity = 100
}
known-nodes {
diff --git a/src/main/resources/logback.xml b/src/main/resources/logback.xml
index db99763494..2e19239c4d 100644
--- a/src/main/resources/logback.xml
+++ b/src/main/resources/logback.xml
@@ -56,6 +56,8 @@
+
+
diff --git a/src/main/scala/io/iohk/ethereum/faucet/jsonrpc/FaucetBuilder.scala b/src/main/scala/io/iohk/ethereum/faucet/jsonrpc/FaucetBuilder.scala
index b6604cd944..cbca34c906 100644
--- a/src/main/scala/io/iohk/ethereum/faucet/jsonrpc/FaucetBuilder.scala
+++ b/src/main/scala/io/iohk/ethereum/faucet/jsonrpc/FaucetBuilder.scala
@@ -64,7 +64,7 @@ trait SecureRandomBuilder {
self: FaucetConfigBuilder =>
lazy val secureRandom: SecureRandom =
ConfigUtils
- .getOptionalValue(rawMantisConfig, "secure-random-algo", config => config.getString("secure-random-algo"))
+ .getOptionalValue(rawMantisConfig, _.getString, "secure-random-algo")
.flatMap(name => Try { SecureRandom.getInstance(name) }.toOption)
.getOrElse(new SecureRandom())
}
diff --git a/src/main/scala/io/iohk/ethereum/jsonrpc/server/http/JsonRpcHttpServer.scala b/src/main/scala/io/iohk/ethereum/jsonrpc/server/http/JsonRpcHttpServer.scala
index f6faada726..29b513bd03 100644
--- a/src/main/scala/io/iohk/ethereum/jsonrpc/server/http/JsonRpcHttpServer.scala
+++ b/src/main/scala/io/iohk/ethereum/jsonrpc/server/http/JsonRpcHttpServer.scala
@@ -19,8 +19,6 @@ import monix.eval.Task
import monix.execution.Scheduler.Implicits.global
import org.json4s.{DefaultFormats, JInt, native}
-import scala.util.Try
-
trait JsonRpcHttpServer extends Json4sSupport {
val jsonRpcController: JsonRpcBaseController
val jsonRpcHealthChecker: JsonRpcHealthChecker
@@ -134,15 +132,12 @@ object JsonRpcHttpServer extends Logger {
override val corsAllowedOrigins = ConfigUtils.parseCorsAllowedOrigins(rpcHttpConfig, "cors-allowed-origins")
- override val certificateKeyStorePath: Option[String] = Try(
- rpcHttpConfig.getString("certificate-keystore-path")
- ).toOption
- override val certificateKeyStoreType: Option[String] = Try(
- rpcHttpConfig.getString("certificate-keystore-type")
- ).toOption
- override val certificatePasswordFile: Option[String] = Try(
- rpcHttpConfig.getString("certificate-password-file")
- ).toOption
+ override val certificateKeyStorePath: Option[String] =
+ ConfigUtils.getOptionalValue(rpcHttpConfig, _.getString, "certificate-keystore-path")
+ override val certificateKeyStoreType: Option[String] =
+ ConfigUtils.getOptionalValue(rpcHttpConfig, _.getString, "certificate-keystore-type")
+ override val certificatePasswordFile: Option[String] =
+ ConfigUtils.getOptionalValue(rpcHttpConfig, _.getString, "certificate-password-file")
}
}
}
diff --git a/src/main/scala/io/iohk/ethereum/network/PeerManagerActor.scala b/src/main/scala/io/iohk/ethereum/network/PeerManagerActor.scala
index 7e1ed32eee..445018dff1 100644
--- a/src/main/scala/io/iohk/ethereum/network/PeerManagerActor.scala
+++ b/src/main/scala/io/iohk/ethereum/network/PeerManagerActor.scala
@@ -41,9 +41,12 @@ class PeerManagerActor(
/**
* Maximum number of blacklisted nodes will never be larger than number of peers provided by discovery
* Discovery provides remote nodes from all networks (ETC,ETH, Mordor etc.) only during handshake we learn that some
- * of the remote nodes are not compatible that's why we mark them as useless (blacklist them)
+ * of the remote nodes are not compatible that's why we mark them as useless (blacklist them).
+ *
+ * The number of nodes in the current discovery is unlimited, but a guide may be the size of the routing table:
+ * one bucket for each bit in the hash of the public key, times the bucket size.
*/
- override val maxBlacklistedNodes: Int = discoveryConfig.nodesLimit
+ override val maxBlacklistedNodes: Int = 32 * 8 * discoveryConfig.kademliaBucketSize
import PeerManagerActor._
import akka.pattern.{ask, pipe}
@@ -96,23 +99,22 @@ class PeerManagerActor(
log.debug("The known nodes list is empty")
}
- case PeerDiscoveryManager.DiscoveredNodesInfo(nodesInfo) =>
- val nodesToConnect = nodesInfo
- .filterNot { discoveryNodeInfo =>
- val socketAddress = discoveryNodeInfo.node.tcpSocketAddress
- val alreadyConnected = connectedPeers.isConnectionHandled(socketAddress) || connectedPeers.hasHandshakedWith(
- discoveryNodeInfo.node.id
- )
+ case PeerDiscoveryManager.DiscoveredNodesInfo(nodes) =>
+ val nodesToConnect = nodes
+ .filterNot { node =>
+ val socketAddress = node.tcpSocketAddress
+ val alreadyConnected =
+ connectedPeers.isConnectionHandled(socketAddress) || connectedPeers.hasHandshakedWith(node.id)
alreadyConnected || isBlacklisted(PeerAddress(socketAddress.getHostString))
} // not already connected to or blacklisted
.take(peerConfiguration.maxOutgoingPeers - connectedPeers.outgoingPeersCount)
- NetworkMetrics.DiscoveredPeersSize.set(nodesInfo.size)
+ NetworkMetrics.DiscoveredPeersSize.set(nodes.size)
NetworkMetrics.BlacklistedPeersSize.set(blacklistedPeers.size)
NetworkMetrics.PendingPeersSize.set(connectedPeers.pendingPeersCount)
log.info(
- s"Discovered ${nodesInfo.size} nodes, " +
+ s"Discovered ${nodes.size} nodes, " +
s"Blacklisted ${blacklistedPeers.size} nodes, " +
s"handshaked to ${connectedPeers.handshakedPeersCount}/${peerConfiguration.maxOutgoingPeers + peerConfiguration.maxIncomingPeers}, " +
s"pending connection attempts ${connectedPeers.pendingPeersCount}. " +
@@ -121,7 +123,7 @@ class PeerManagerActor(
if (nodesToConnect.nonEmpty) {
log.debug("Trying to connect to {} nodes", nodesToConnect.size)
- nodesToConnect.foreach(n => self ! ConnectToPeer(n.node.toUri))
+ nodesToConnect.foreach(n => self ! ConnectToPeer(n.toUri))
} else {
log.debug("The nodes list is empty, no new nodes to connect to")
}
diff --git a/src/main/scala/io/iohk/ethereum/network/discovery/DiscoveryConfig.scala b/src/main/scala/io/iohk/ethereum/network/discovery/DiscoveryConfig.scala
index 610915eb54..e0c49ad97a 100644
--- a/src/main/scala/io/iohk/ethereum/network/discovery/DiscoveryConfig.scala
+++ b/src/main/scala/io/iohk/ethereum/network/discovery/DiscoveryConfig.scala
@@ -1,19 +1,24 @@
package io.iohk.ethereum.network.discovery
+import io.iohk.ethereum.utils.ConfigUtils
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.duration._
case class DiscoveryConfig(
discoveryEnabled: Boolean,
+ host: Option[String],
interface: String,
port: Int,
bootstrapNodes: Set[Node],
- nodesLimit: Int /* TODO: remove once proper discovery protocol is in place */,
- scanMaxNodes: Int /* TODO: remove once proper discovery protocol is in place */,
- maxNeighbours: Int /* TODO: remove once proper discovery protocol is in place */,
- scanInitialDelay: FiniteDuration,
+ reuseKnownNodes: Boolean,
scanInterval: FiniteDuration,
- messageExpiration: FiniteDuration
+ messageExpiration: FiniteDuration,
+ maxClockDrift: FiniteDuration,
+ requestTimeout: FiniteDuration,
+ kademliaTimeout: FiniteDuration,
+ kademliaBucketSize: Int,
+ kademliaAlpha: Int,
+ channelCapacity: Int
)
object DiscoveryConfig {
@@ -22,15 +27,19 @@ object DiscoveryConfig {
DiscoveryConfig(
discoveryEnabled = discoveryConfig.getBoolean("discovery-enabled"),
+ host = ConfigUtils.getOptionalValue(discoveryConfig, _.getString, "host"),
interface = discoveryConfig.getString("interface"),
port = discoveryConfig.getInt("port"),
bootstrapNodes = NodeParser.parseNodes(bootstrapNodes),
- nodesLimit = discoveryConfig.getInt("nodes-limit"),
- scanMaxNodes = discoveryConfig.getInt("scan-max-nodes"),
- maxNeighbours = discoveryConfig.getInt("max-sent-neighbours"),
- scanInitialDelay = discoveryConfig.getDuration("scan-initial-delay").toMillis.millis,
+ reuseKnownNodes = discoveryConfig.getBoolean("reuse-known-nodes"),
scanInterval = discoveryConfig.getDuration("scan-interval").toMillis.millis,
- messageExpiration = discoveryConfig.getDuration("message-expiration").toMillis.millis
+ messageExpiration = discoveryConfig.getDuration("message-expiration").toMillis.millis,
+ maxClockDrift = discoveryConfig.getDuration("max-clock-drift").toMillis.millis,
+ requestTimeout = discoveryConfig.getDuration("request-timeout").toMillis.millis,
+ kademliaTimeout = discoveryConfig.getDuration("kademlia-timeout").toMillis.millis,
+ kademliaBucketSize = discoveryConfig.getInt("kademlia-bucket-size"),
+ kademliaAlpha = discoveryConfig.getInt("kademlia-alpha"),
+ channelCapacity = discoveryConfig.getInt("channel-capacity")
)
}
diff --git a/src/main/scala/io/iohk/ethereum/network/discovery/DiscoveryListener.scala b/src/main/scala/io/iohk/ethereum/network/discovery/DiscoveryListener.scala
deleted file mode 100644
index b42192be6c..0000000000
--- a/src/main/scala/io/iohk/ethereum/network/discovery/DiscoveryListener.scala
+++ /dev/null
@@ -1,58 +0,0 @@
-package io.iohk.ethereum.network.discovery
-
-import akka.actor.{Actor, ActorLogging, ActorRef, Props}
-import akka.io.{IO, Udp}
-import io.iohk.ethereum.utils.{NodeStatus, ServerStatus}
-import java.net.InetSocketAddress
-import java.util.concurrent.atomic.AtomicReference
-import scala.util.{Failure, Success}
-
-class DiscoveryListener(discoveryConfig: DiscoveryConfig, nodeStatusHolder: AtomicReference[NodeStatus])
- extends Actor
- with ActorLogging {
-
- import DiscoveryListener._
- import context.system
-
- var subscribers: Set[ActorRef] = Set.empty
-
- override def receive: Receive = handleSubscribe orElse {
- case Start =>
- IO(Udp) ! Udp.Bind(self, new InetSocketAddress(discoveryConfig.interface, discoveryConfig.port))
-
- case Udp.Bound(local) =>
- nodeStatusHolder.getAndUpdate(_.copy(discoveryStatus = ServerStatus.Listening(local)))
- context.become(ready(sender()))
- }
-
- def handleSubscribe: Receive = { case Subscribe =>
- subscribers += sender()
- }
-
- def ready(socket: ActorRef): Receive = handleSubscribe orElse {
- case Udp.Received(data, remote) =>
- val msgReceivedTry = for {
- packet <- decodePacket(data)
- message <- extractMessage(packet)
- } yield MessageReceived(message, remote, packet)
-
- msgReceivedTry match {
- case Success(msgReceived) => subscribers.foreach(_ ! msgReceived)
- case Failure(ex) => log.error(ex, "Unable to decode discovery packet from {}", remote)
- }
-
- case SendPacket(packet, to) =>
- socket ! Udp.Send(packet.wire, to)
- }
-}
-
-object DiscoveryListener {
- def props(config: DiscoveryConfig, nodeStatusHolder: AtomicReference[NodeStatus]): Props =
- Props(new DiscoveryListener(config, nodeStatusHolder))
-
- case object Start
- case object Subscribe
-
- case class SendPacket(packet: Packet, to: InetSocketAddress)
- case class MessageReceived(message: Message, from: InetSocketAddress, packet: Packet)
-}
diff --git a/src/main/scala/io/iohk/ethereum/network/discovery/DiscoveryServiceBuilder.scala b/src/main/scala/io/iohk/ethereum/network/discovery/DiscoveryServiceBuilder.scala
new file mode 100644
index 0000000000..7fb1d6f01b
--- /dev/null
+++ b/src/main/scala/io/iohk/ethereum/network/discovery/DiscoveryServiceBuilder.scala
@@ -0,0 +1,159 @@
+package io.iohk.ethereum.network.discovery
+
+import cats.effect.Resource
+import io.iohk.ethereum.crypto
+import io.iohk.ethereum.network.discovery.codecs.RLPCodecs
+import io.iohk.ethereum.utils.{NodeStatus, ServerStatus}
+import io.iohk.scalanet.discovery.crypto.{PrivateKey, PublicKey, SigAlg}
+import io.iohk.scalanet.discovery.ethereum.{Node => ENode, EthereumNodeRecord}
+import io.iohk.scalanet.discovery.ethereum.v4
+import io.iohk.scalanet.peergroup.{InetMultiAddress, ExternalAddressResolver}
+import io.iohk.scalanet.peergroup.udp.StaticUDPPeerGroup
+import java.net.InetAddress
+import java.net.InetSocketAddress
+import java.util.concurrent.atomic.AtomicReference
+import monix.eval.Task
+import monix.execution.Scheduler
+import scodec.bits.BitVector
+import scodec.Codec
+
+trait DiscoveryServiceBuilder {
+
+ def discoveryServiceResource(
+ discoveryConfig: DiscoveryConfig,
+ tcpPort: Int,
+ nodeStatusHolder: AtomicReference[NodeStatus]
+ )(implicit scheduler: Scheduler): Resource[Task, v4.DiscoveryService] = {
+
+ implicit val sigalg = new Secp256k1SigAlg()
+ val keyPair = nodeStatusHolder.get.key
+ val (privateKeyBytes, _) = crypto.keyPairToByteArrays(keyPair)
+ val privateKey = PrivateKey(BitVector(privateKeyBytes))
+
+ implicit val packetCodec = v4.Packet.packetCodec(allowDecodeOverMaxPacketSize = true)
+ implicit val payloadCodec = RLPCodecs.payloadCodec
+ implicit val enrContentCodec = RLPCodecs.codecFromRLPCodec(RLPCodecs.enrContentRLPCodec)
+
+ val resource = for {
+ host <- Resource.liftF {
+ getExternalAddress(discoveryConfig)
+ }
+ localNode = ENode(
+ id = sigalg.toPublicKey(privateKey),
+ address = ENode.Address(
+ ip = host,
+ udpPort = discoveryConfig.port,
+ tcpPort = tcpPort
+ )
+ )
+ config = makeDiscoveryConfig(discoveryConfig)
+ udpConfig = makeUdpConfig(discoveryConfig, host)
+ peerGroup <- StaticUDPPeerGroup[v4.Packet](udpConfig)
+ network <- makeDiscoveryNetwork(peerGroup, privateKey, localNode, config)
+ service <- makeDiscoveryService(network, privateKey, localNode, config)
+ _ <- Resource.liftF {
+ setDiscoveryStatus(nodeStatusHolder, ServerStatus.Listening(udpConfig.bindAddress))
+ }
+ } yield service
+
+ resource
+ .onFinalize {
+ setDiscoveryStatus(nodeStatusHolder, ServerStatus.NotListening)
+ }
+ }
+
+ private def makeDiscoveryConfig(discoveryConfig: DiscoveryConfig): v4.DiscoveryConfig =
+ v4.DiscoveryConfig.default.copy(
+ messageExpiration = discoveryConfig.messageExpiration,
+ maxClockDrift = discoveryConfig.maxClockDrift,
+ discoveryPeriod = discoveryConfig.scanInterval,
+ requestTimeout = discoveryConfig.requestTimeout,
+ kademliaTimeout = discoveryConfig.kademliaTimeout,
+ kademliaBucketSize = discoveryConfig.kademliaBucketSize,
+ kademliaAlpha = discoveryConfig.kademliaAlpha,
+ // Discovery is going to enroll with all the bootstrap nodes passed to it.
+ // In theory we could give it the (potentially hundreds of) nodes Mantis
+ // peristed on earlier runs. Because the enrollment happens in the background,
+ // it wouldn't cause any slowdown in the initialization process.
+ knownPeers = (discoveryConfig.bootstrapNodes).map { node =>
+ ENode(
+ id = PublicKey(BitVector(node.id.toArray[Byte])),
+ address = ENode.Address(
+ ip = node.addr,
+ udpPort = node.udpPort,
+ tcpPort = node.tcpPort
+ )
+ )
+ }
+ )
+
+ private def getExternalAddress(discoveryConfig: DiscoveryConfig): Task[InetAddress] =
+ discoveryConfig.host match {
+ case Some(host) =>
+ Task(InetAddress.getByName(host))
+
+ case None =>
+ ExternalAddressResolver.default.resolve.flatMap {
+ case Some(address) =>
+ Task.pure(address)
+ case None =>
+ Task.raiseError(
+ new IllegalStateException(
+ s"Failed to resolve the external address. Please configure it via -Dmantis.network.discovery.host"
+ )
+ )
+ }
+ }
+
+ private def makeUdpConfig(discoveryConfig: DiscoveryConfig, host: InetAddress): StaticUDPPeerGroup.Config =
+ StaticUDPPeerGroup.Config(
+ bindAddress = new InetSocketAddress(discoveryConfig.interface, discoveryConfig.port),
+ processAddress = InetMultiAddress(new InetSocketAddress(host, discoveryConfig.port)),
+ channelCapacity = discoveryConfig.channelCapacity,
+ receiveBufferSizeBytes = v4.Packet.MaxPacketBitsSize / 8 * 2
+ )
+
+ private def setDiscoveryStatus(nodeStatusHolder: AtomicReference[NodeStatus], status: ServerStatus): Task[Unit] =
+ Task(nodeStatusHolder.updateAndGet(_.copy(discoveryStatus = status)))
+
+ private def makeDiscoveryNetwork(
+ peerGroup: StaticUDPPeerGroup[v4.Packet],
+ privateKey: PrivateKey,
+ localNode: ENode,
+ config: v4.DiscoveryConfig
+ )(implicit codec: Codec[v4.Payload], sigalg: SigAlg): Resource[Task, v4.DiscoveryNetwork[InetMultiAddress]] =
+ Resource.liftF {
+ v4.DiscoveryNetwork[InetMultiAddress](
+ peerGroup = peerGroup,
+ privateKey = privateKey,
+ localNodeAddress = localNode.address,
+ toNodeAddress = (address: InetMultiAddress) =>
+ ENode.Address(
+ ip = address.inetSocketAddress.getAddress,
+ udpPort = address.inetSocketAddress.getPort,
+ tcpPort = 0
+ ),
+ config = config
+ )
+ }
+
+ private def makeDiscoveryService(
+ network: v4.DiscoveryNetwork[InetMultiAddress],
+ privateKey: PrivateKey,
+ localNode: ENode,
+ config: v4.DiscoveryConfig
+ )(implicit sigalg: SigAlg, enrContentCodec: Codec[EthereumNodeRecord.Content]): Resource[Task, v4.DiscoveryService] =
+ v4.DiscoveryService[InetMultiAddress](
+ privateKey = privateKey,
+ node = localNode,
+ config = config,
+ network = network,
+ toAddress = (address: ENode.Address) => InetMultiAddress(new InetSocketAddress(address.ip, address.udpPort)),
+ // On a network with many bootstrap nodes the enrollment and the initial self-lookup can take considerable
+ // amount of time. We can do the enrollment in the background, which means the service is available from the
+ // start, and the nodes can be contacted and gradually as they are discovered during the iterative lookup,
+ // rather than at the end of the enrollment. Mantis will also contact its previously persisted peers,
+ // from that perspective it doesn't care whether enrollment is over or not.
+ enrollInBackground = true
+ )
+}
diff --git a/src/main/scala/io/iohk/ethereum/network/discovery/PeerDiscoveryManager.scala b/src/main/scala/io/iohk/ethereum/network/discovery/PeerDiscoveryManager.scala
index 58bd74577c..5bf6150106 100644
--- a/src/main/scala/io/iohk/ethereum/network/discovery/PeerDiscoveryManager.scala
+++ b/src/main/scala/io/iohk/ethereum/network/discovery/PeerDiscoveryManager.scala
@@ -1,206 +1,203 @@
package io.iohk.ethereum.network.discovery
import akka.actor.{Actor, ActorLogging, ActorRef, Props}
+import akka.pattern.pipe
import akka.util.ByteString
-import io.iohk.ethereum.crypto
+import cats.effect.Resource
import io.iohk.ethereum.db.storage.KnownNodesStorage
-import io.iohk.ethereum.network._
-import io.iohk.ethereum.rlp.RLPEncoder
-import io.iohk.ethereum.utils.{NodeStatus, ServerStatus}
-import java.net.{InetSocketAddress, URI}
-import java.time.Clock
-import java.util.concurrent.atomic.AtomicReference
-import scala.concurrent.ExecutionContext.Implicits.global
-import scala.util.Random
+import io.iohk.scalanet.discovery.ethereum.v4
+import monix.eval.Task
+import monix.execution.Scheduler
+import scala.util.{Failure, Success}
class PeerDiscoveryManager(
- discoveryListener: ActorRef,
+ localNodeId: ByteString,
discoveryConfig: DiscoveryConfig,
knownNodesStorage: KnownNodesStorage,
- nodeStatusHolder: AtomicReference[NodeStatus],
- clock: Clock
-) extends Actor
+ // The manager only starts the DiscoveryService if discovery is enabled.
+ discoveryServiceResource: Resource[Task, v4.DiscoveryService]
+)(implicit scheduler: Scheduler)
+ extends Actor
with ActorLogging {
import PeerDiscoveryManager._
- val expirationTimeSec = discoveryConfig.messageExpiration.toSeconds
+ // The following logic is for backwards compatibility.
+ val alreadyDiscoveredNodes: Set[Node] =
+ if (!discoveryConfig.reuseKnownNodes) Set.empty
+ else {
+ // The manager considered the bootstrap nodes discovered, even if discovery was disabled.
+ val bootstrapNodes: Set[Node] =
+ discoveryConfig.bootstrapNodes
+ // The known nodes were considered discovered even if they haven't yet responded to pings; unless discovery was disabled.
+ val knownNodes: Set[Node] =
+ if (!discoveryConfig.discoveryEnabled) Set.empty
+ else
+ knownNodesStorage.getKnownNodes().map(Node.fromUri)
+
+ bootstrapNodes ++ knownNodes
+ }
- val bootStrapNodesInfo = discoveryConfig.bootstrapNodes.map(DiscoveryNodeInfo.fromNode)
+ override def receive: Receive = init
- var pingedNodes: Map[ByteString, PingInfo] = Map.empty
+ // The service hasn't been started yet, so it just serves the static known nodes.
+ def init: Receive = {
+ case GetDiscoveredNodesInfo =>
+ sendDiscoveredNodesInfo(None, sender)
+
+ case Start =>
+ if (discoveryConfig.discoveryEnabled) {
+ log.info("Starting peer discovery...")
+ startDiscoveryService()
+ context.become(starting)
+ } else {
+ log.info("Peer discovery is disabled.")
+ }
- val startingNodes: Map[ByteString, DiscoveryNodeInfo] = {
- val knownNodesURIs =
- if (discoveryConfig.discoveryEnabled) knownNodesStorage.getKnownNodes()
- else Set.empty
- val startingNodesInfo = knownNodesURIs.map(uri => DiscoveryNodeInfo.fromUri(uri)) ++ bootStrapNodesInfo
- val startingNodesInfoWithoutSelf = startingNodesInfo.filterNot {
- _.node.id == ByteString(nodeStatusHolder.get().nodeId)
- }
- startingNodesInfoWithoutSelf.map { nodeInfo => nodeInfo.node.id -> nodeInfo }.toMap
+ case Stop =>
}
- var nodesInfo: Map[ByteString, DiscoveryNodeInfo] = startingNodes
+ // Waiting for the DiscoveryService to be initialized. Keep serving known nodes.
+ // This would not be needed if Actors were treated as resources themselves.
+ def starting: Receive = {
+ case GetDiscoveredNodesInfo =>
+ sendDiscoveredNodesInfo(None, sender)
- if (discoveryConfig.discoveryEnabled) {
- discoveryListener ! DiscoveryListener.Subscribe
- context.system.scheduler.scheduleWithFixedDelay(
- discoveryConfig.scanInitialDelay,
- discoveryConfig.scanInterval,
- self,
- Scan
- )
- }
+ case Start =>
- def scan(): Unit = {
- // Ping a random sample from currently pinged nodes without the answer
- new Random().shuffle(pingedNodes.values).take(2 * discoveryConfig.scanMaxNodes).foreach { pingInfo =>
- val node = pingInfo.nodeinfo.node
- sendPing(Endpoint.makeEndpoint(node.udpSocketAddress, node.tcpPort), node.udpSocketAddress, pingInfo.nodeinfo)
- }
+ case Stop =>
+ log.info("Stopping peer discovery...")
+ context.become(stopping)
- nodesInfo.values.toSeq
- .sortBy(_.addTimestamp)
- .takeRight(discoveryConfig.scanMaxNodes)
- .foreach { nodeInfo =>
- sendPing(
- Endpoint.makeEndpoint(nodeInfo.node.udpSocketAddress, nodeInfo.node.tcpPort),
- nodeInfo.node.udpSocketAddress,
- nodeInfo
- )
- }
- }
+ case StartAttempt(result) =>
+ result match {
+ case Right((service, release)) =>
+ log.info("Peer discovery started.")
+ context.become(started(service, release))
- override def receive: Receive = {
- case DiscoveryListener.MessageReceived(ping: Ping, from, packet) =>
- val to = Endpoint.makeEndpoint(from, ping.from.tcpPort)
- sendMessage(Pong(to, packet.mdc, expirationTimestamp), from)
-
- case DiscoveryListener.MessageReceived(pong: Pong, from, _) =>
- pingedNodes.get(pong.token).foreach { newNodeInfo =>
- val nodeInfoUpdatedTime = newNodeInfo.nodeinfo.copy(addTimestamp = clock.millis())
- pingedNodes -= pong.token
- nodesInfo = updateNodes(nodesInfo, nodeInfoUpdatedTime.node.id, nodeInfoUpdatedTime)
- sendMessage(FindNode(ByteString(nodeStatusHolder.get().nodeId), expirationTimestamp), from)
+ case Left(ex) =>
+ log.error(ex, "Failed to start peer discovery.")
+ context.become(init)
}
+ }
- case DiscoveryListener.MessageReceived(_: FindNode, from, _) =>
- sendMessage(Neighbours(getNeighbours(nodesInfo), expirationTimestamp), from)
+ // DiscoveryService started, we can ask it for nodes now.
+ def started(service: v4.DiscoveryService, release: Task[Unit]): Receive = {
+ case GetDiscoveredNodesInfo =>
+ sendDiscoveredNodesInfo(Some(service), sender)
- case DiscoveryListener.MessageReceived(neighbours: Neighbours, _, _) =>
- val toPing = neighbours.nodes
- .filterNot(n => nodesInfo.contains(n.nodeId)) // not already on the list
+ case Start =>
- toPing.foreach { n =>
- Endpoint.toUdpAddress(n.endpoint).foreach { address =>
- val nodeInfo =
- DiscoveryNodeInfo.fromNode(Node(n.nodeId, address.getAddress, n.endpoint.tcpPort, n.endpoint.udpPort))
- sendPing(n.endpoint, address, nodeInfo)
- }
- }
+ case Stop =>
+ log.info("Stopping peer discovery...")
+ stopDiscoveryService(release)
+ context.become(stopping)
+ }
+ // Waiting for the DiscoveryService to be initialized OR we received a stop request
+ // before it even got a chance to start, so we'll stop it immediately.
+ def stopping: Receive = {
case GetDiscoveredNodesInfo =>
- sender() ! DiscoveredNodesInfo(nodesInfo.values.toSet)
+ sendDiscoveredNodesInfo(None, sender)
- case Scan => scan()
- }
+ case Start | Stop =>
- private def sendPing(toEndpoint: Endpoint, toAddr: InetSocketAddress, nodeInfo: DiscoveryNodeInfo): Unit = {
- nodeStatusHolder.get().discoveryStatus match {
- case ServerStatus.Listening(address) =>
- val from = Endpoint.makeEndpoint(address, getTcpPort)
- val ping = Ping(ProtocolVersion, from, toEndpoint, expirationTimestamp)
- val packet = encodePacket(ping, nodeStatusHolder.get().key)
- getPacketData(packet).foreach { key =>
- pingedNodes = updateNodes(pingedNodes, key, PingInfo(nodeInfo, clock.millis()))
- }
- discoveryListener ! DiscoveryListener.SendPacket(Packet(packet), toAddr)
- case _ =>
- log.warning("UDP server not running. Not sending ping message.")
- }
- }
+ case StartAttempt(result) =>
+ result match {
+ case Right((_, release)) =>
+ log.info("Peer discovery started, now stopping...")
+ stopDiscoveryService(release)
- private def getTcpPort: Int = nodeStatusHolder.get().serverStatus match {
- case ServerStatus.Listening(addr) => addr.getPort
- case _ => 0
- }
-
- // FIXME come up with more spohisticated approach to keeping both mdc and sha(packet_data), now it is doubled in Map
- // It turns out that geth and parity sent different validation bytestrings in pong response
- // geth uses mdc, but parity uses sha3(packet_data), so we need to keep track of both things to do not
- // lose large part of potential nodes. https://github.com/ethereumproject/go-ethereum/issues/312
- private def getPacketData(ping: ByteString): List[ByteString] = {
- val packet = Packet(ping)
- val packetMdc = packet.mdc
- val packetDataHash = crypto.kec256(packet.data)
- List(packetMdc, packetDataHash)
- }
+ case Left(ex) =>
+ log.error(ex, "Failed to start peer discovery.")
+ context.become(init)
+ }
- private def updateNodes[V <: TimedInfo](map: Map[ByteString, V], key: ByteString, info: V): Map[ByteString, V] = {
- if (map.size < discoveryConfig.nodesLimit) {
- map + (key -> info)
- } else {
- replaceOldestNode(map, key, info)
- }
+ case StopAttempt(result) =>
+ result match {
+ case Right(_) =>
+ log.info("Peer discovery stopped.")
+ case Left(ex) =>
+ log.error(ex, "Failed to stop peer discovery.")
+ }
+ context.become(init)
}
- private def replaceOldestNode[V <: TimedInfo](
- map: Map[ByteString, V],
- key: ByteString,
- info: V
- ): Map[ByteString, V] = {
- val (earliestNode, _) = map.minBy { case (_, node) => node.addTimestamp }
- val newMap = map - earliestNode
- newMap + (key -> info)
+ def startDiscoveryService(): Unit = {
+ discoveryServiceResource.allocated.runToFuture
+ .onComplete {
+ case Failure(ex) =>
+ self ! StartAttempt(Left(ex))
+ case Success(result) =>
+ self ! StartAttempt(Right(result))
+ }
}
- private def sendMessage[M <: Message](message: M, to: InetSocketAddress)(implicit rlpEnc: RLPEncoder[M]): Unit = {
- nodeStatusHolder.get().discoveryStatus match {
- case ServerStatus.Listening(_) =>
- val packet = Packet(encodePacket(message, nodeStatusHolder.get().key))
- discoveryListener ! DiscoveryListener.SendPacket(packet, to)
- case _ =>
- log.warning(s"UDP server not running. Not sending message $message.")
+ def stopDiscoveryService(release: Task[Unit]): Unit = {
+ release.runToFuture.onComplete {
+ case Failure(ex) =>
+ self ! StopAttempt(Left(ex))
+ case Success(result) =>
+ self ! StopAttempt(Right(result))
}
}
- private def getNeighbours(nodesInfo: Map[ByteString, DiscoveryNodeInfo]): Seq[Neighbour] = {
- val randomNodes = new Random().shuffle(nodesInfo.values).take(discoveryConfig.maxNeighbours).toSeq
- randomNodes.map(nodeInfo =>
- Neighbour(Endpoint.makeEndpoint(nodeInfo.node.udpSocketAddress, nodeInfo.node.tcpPort), nodeInfo.node.id)
- )
- }
+ def sendDiscoveredNodesInfo(
+ maybeDiscoveryService: Option[v4.DiscoveryService],
+ recipient: ActorRef
+ ): Unit = {
+
+ val maybeDiscoveredNodes: Task[Set[Node]] =
+ maybeDiscoveryService.fold(Task.pure(Set.empty[Node])) {
+ _.getNodes.map { nodes =>
+ nodes.map { node =>
+ Node(
+ id = ByteString(node.id.toByteArray),
+ addr = node.address.ip,
+ tcpPort = node.address.tcpPort,
+ udpPort = node.address.udpPort
+ )
+ }
+ }
+ }
- private def expirationTimestamp = clock.instant().plusSeconds(expirationTimeSec).getEpochSecond
+ maybeDiscoveredNodes
+ .map(_ ++ alreadyDiscoveredNodes)
+ .map(_.filterNot(_.id == localNodeId))
+ .map(DiscoveredNodesInfo(_))
+ .doOnFinish {
+ case Some(ex) =>
+ Task(log.error(ex, "Failed to get discovered nodes."))
+ case _ =>
+ Task.unit
+ }
+ .runToFuture
+ .pipeTo(recipient)
+ }
}
object PeerDiscoveryManager {
def props(
- discoveryListener: ActorRef,
+ localNodeId: ByteString,
discoveryConfig: DiscoveryConfig,
knownNodesStorage: KnownNodesStorage,
- nodeStatusHolder: AtomicReference[NodeStatus],
- clock: Clock
- ): Props =
- Props(new PeerDiscoveryManager(discoveryListener, discoveryConfig, knownNodesStorage, nodeStatusHolder, clock))
-
- object DiscoveryNodeInfo {
-
- def fromUri(uri: URI): DiscoveryNodeInfo = fromNode(Node.fromUri(uri))
-
- def fromNode(node: Node): DiscoveryNodeInfo = DiscoveryNodeInfo(node, System.currentTimeMillis())
+ discoveryServiceResource: Resource[Task, v4.DiscoveryService]
+ )(implicit scheduler: Scheduler): Props =
+ Props(
+ new PeerDiscoveryManager(
+ localNodeId,
+ discoveryConfig,
+ knownNodesStorage,
+ discoveryServiceResource
+ )
+ )
- }
+ case object Start
+ case object Stop
- sealed abstract class TimedInfo {
- def addTimestamp: Long
- }
- case class DiscoveryNodeInfo(node: Node, addTimestamp: Long) extends TimedInfo
- case class PingInfo(nodeinfo: DiscoveryNodeInfo, addTimestamp: Long) extends TimedInfo
+ private case class StartAttempt(result: Either[Throwable, (v4.DiscoveryService, Task[Unit])])
+ private case class StopAttempt(result: Either[Throwable, Unit])
case object GetDiscoveredNodesInfo
- case class DiscoveredNodesInfo(nodes: Set[DiscoveryNodeInfo])
-
- private[discovery] case object Scan
+ case class DiscoveredNodesInfo(nodes: Set[Node])
}
diff --git a/src/main/scala/io/iohk/ethereum/network/discovery/codecs/RLPCodecs.scala b/src/main/scala/io/iohk/ethereum/network/discovery/codecs/RLPCodecs.scala
index 38b62a57a3..ed8c7babb3 100644
--- a/src/main/scala/io/iohk/ethereum/network/discovery/codecs/RLPCodecs.scala
+++ b/src/main/scala/io/iohk/ethereum/network/discovery/codecs/RLPCodecs.scala
@@ -5,22 +5,33 @@ import io.iohk.scalanet.discovery.ethereum.{Node, EthereumNodeRecord}
import io.iohk.scalanet.discovery.ethereum.v4.Payload
import io.iohk.scalanet.discovery.hash.Hash
import io.iohk.ethereum.rlp
-import io.iohk.ethereum.rlp.{RLPCodec, RLPList}
+import io.iohk.ethereum.rlp.{RLPCodec, RLPList, RLPEncoder}
import io.iohk.ethereum.rlp.RLPCodec.Ops
import io.iohk.ethereum.rlp.RLPImplicits._
-import io.iohk.ethereum.rlp.RLPImplicitConversions._
+import io.iohk.ethereum.rlp.RLPImplicitConversions.toEncodeable
import io.iohk.ethereum.rlp.RLPImplicitDerivations._
import scodec.{Codec, Attempt, Err, DecodeResult}
import scodec.bits.{BitVector, ByteVector}
import java.net.InetAddress
-import scala.collection.SortedMap
import scala.util.Try
-import io.iohk.ethereum.rlp.RLPEncoder
-import io.iohk.ethereum.rlp.RLPDecoder
+import io.iohk.ethereum.rlp.RLPEncodeable
/** RLP codecs based on https://github.com/ethereum/devp2p/blob/master/discv4.md */
-object RLPCodecs {
+object RLPCodecs extends ContentCodecs with PayloadCodecs {
+ implicit def codecFromRLPCodec[T: RLPCodec]: Codec[T] =
+ Codec[T](
+ (value: T) => {
+ val bytes = rlp.encode(value)
+ Attempt.successful(BitVector(bytes))
+ },
+ (bits: BitVector) => {
+ val tryDecode = Try(rlp.decode[T](bits.toByteArray))
+ Attempt.fromTry(tryDecode.map(DecodeResult(_, BitVector.empty)))
+ }
+ )
+}
+trait ContentCodecs {
implicit val inetAddressRLPCodec: RLPCodec[InetAddress] =
implicitly[RLPCodec[Array[Byte]]].xmap(InetAddress.getByAddress(_), _.getAddress)
@@ -48,57 +59,80 @@ object RLPCodecs {
RLPEncoder.encode(address).asInstanceOf[RLPList] :+ id
},
{
- case list @ RLPList(items @ _*) if items.length >= 4 =>
- val address = RLPDecoder.decode[Node.Address](list)
- val id = RLPDecoder.decode[PublicKey](items(3))
+ case RLPList(items @ _*) if items.length == 4 =>
+ val address = RLPList(items.take(3): _*).decodeAs[Node.Address]("address")
+ val id = items(3).decodeAs[PublicKey]("id")
Node(id, address)
}
)
// https://github.com/ethereum/devp2p/blob/master/enr.md#rlp-encoding
// content = [seq, k, v, ...]
- implicit val enrContentRLPCodec: RLPCodec[EthereumNodeRecord.Content] =
+ implicit val enrContentRLPCodec: RLPCodec[EthereumNodeRecord.Content] = {
+ // Differentiating by predefined keys is a workaround for the situation that
+ // EthereumNodeRecord holds ByteVectors, not RLPEncodeable instances in its map,
+ // but as per the spec the content can be anything (up to a total of 300 bytes).
+ // We need to be able to preserve the fidelity of the encoding over a roundtrip
+ // so that we can verify signatures, so we have to be able to put things in the
+ // map as bytes and later be able to tell whether they were originally an
+ // RLPValue on an RLPList.
+ // For now treat all predefined keys as bytes and everything else as RLP.
+ import EthereumNodeRecord.Keys.Predefined
+
RLPCodec.instance(
{ case EthereumNodeRecord.Content(seq, attrs) =>
val kvs = attrs
- .foldRight(RLPList()) { case ((k, v), kvs) =>
- k.toArray +: v.toArray +: kvs
+ .foldRight(RLPList()) { case ((key, value), kvs) =>
+ val k: RLPEncodeable = key
+ val v: RLPEncodeable = if (Predefined(key)) value else rlp.rawDecode(value.toArray)
+ k +: v +: kvs
}
-
seq +: kvs
},
{ case RLPList(seq, kvs @ _*) =>
val attrs = kvs
.grouped(2)
.collect { case Seq(k, v) =>
- rlp.decode[ByteVector](k) -> rlp.decode[ByteVector](v)
+ val key = k.decodeAs[ByteVector]("key")
+ val keyString = Try(new String(key.toArray)).getOrElse(key.toString)
+ val value =
+ if (Predefined(key)) {
+ v.decodeAs[ByteVector](s"value of key '${keyString}'")
+ } else {
+ ByteVector(rlp.encode(v))
+ }
+ key -> value
}
.toSeq
- // TODO: Should have a constructor for key-value pairs.
- import EthereumNodeRecord.byteOrdering
-
EthereumNodeRecord.Content(
- seq,
- SortedMap(attrs: _*)
+ seq.decodeAs[Long]("seq"),
+ attrs: _*
)
}
)
+ }
// record = [signature, seq, k, v, ...]
implicit val enrRLPCodec: RLPCodec[EthereumNodeRecord] =
RLPCodec.instance(
{ case EthereumNodeRecord(signature, content) =>
val contentList = RLPEncoder.encode(content).asInstanceOf[RLPList]
- signature.toByteArray +: contentList
+ signature +: contentList
},
{ case RLPList(signature, content @ _*) =>
EthereumNodeRecord(
- rlp.decode[Signature](signature),
- rlp.decode[EthereumNodeRecord.Content](RLPList(content: _*))
+ signature.decodeAs[Signature]("signature"),
+ RLPList(content: _*).decodeAs[EthereumNodeRecord.Content]("content")
)
}
)
+}
+
+trait PayloadCodecs { self: ContentCodecs =>
+
+ private implicit val payloadDerivationPolicy =
+ DerivationPolicy.default.copy(omitTrailingOptionals = true)
implicit val pingRLPCodec: RLPCodec[Payload.Ping] =
deriveLabelledGenericRLPCodec
@@ -118,18 +152,6 @@ object RLPCodecs {
implicit val enrResponseRLPCodec: RLPCodec[Payload.ENRResponse] =
deriveLabelledGenericRLPCodec
- implicit def codecFromRLPCodec[T: RLPCodec]: Codec[T] =
- Codec[T](
- (value: T) => {
- val bytes = rlp.encode(value)
- Attempt.successful(BitVector(bytes))
- },
- (bits: BitVector) => {
- val tryDecode = Try(rlp.decode[T](bits.toByteArray))
- Attempt.fromTry(tryDecode.map(DecodeResult(_, BitVector.empty)))
- }
- )
-
private object PacketType {
val Ping: Byte = 0x01
val Pong: Byte = 0x02
diff --git a/src/main/scala/io/iohk/ethereum/network/discovery/message.scala b/src/main/scala/io/iohk/ethereum/network/discovery/message.scala
deleted file mode 100644
index 08b6b0f54e..0000000000
--- a/src/main/scala/io/iohk/ethereum/network/discovery/message.scala
+++ /dev/null
@@ -1,143 +0,0 @@
-package io.iohk.ethereum.network.discovery
-
-import java.net.{InetAddress, InetSocketAddress}
-
-import akka.util.ByteString
-import io.iohk.ethereum.rlp.{RLPDecoder, RLPEncodeable, RLPEncoder, RLPList}
-import io.iohk.ethereum.rlp.RLPImplicits._
-import io.iohk.ethereum.rlp.RLPImplicitConversions._
-
-import scala.util.Try
-
-sealed trait Message {
- def packetType: Byte
-}
-
-object Endpoint {
-
- def makeEndpoint(udpAddress: InetSocketAddress, tcpPort: Int): Endpoint =
- Endpoint(ByteString(udpAddress.getAddress.getAddress), udpAddress.getPort, tcpPort)
-
- def toUdpAddress(endpoint: Endpoint): Option[InetSocketAddress] = {
- val addr = Try(InetAddress.getByAddress(endpoint.address.toArray)).toOption
- addr.map(address => new InetSocketAddress(address, endpoint.udpPort))
- }
-
- implicit val rlpEncDec = new RLPEncoder[Endpoint] with RLPDecoder[Endpoint] {
- override def encode(obj: Endpoint): RLPEncodeable = {
- import obj._
- RLPList(address, udpPort, tcpPort)
- }
-
- override def decode(rlp: RLPEncodeable): Endpoint = rlp match {
- case RLPList(address, udpPort, tcpPort, _*) =>
- Endpoint(address, udpPort, tcpPort)
- case _ => throw new RuntimeException("Cannot decode Endpoint")
- }
- }
-
-}
-
-case class Endpoint(address: ByteString, udpPort: Int, tcpPort: Int)
-
-object Ping {
- val packetType: Byte = 0x01
-
- implicit val rlpEncDec = new RLPEncoder[Ping] with RLPDecoder[Ping] {
- override def encode(obj: Ping): RLPEncodeable = {
- import obj._
- RLPList(version, from, to, timestamp)
- }
-
- override def decode(rlp: RLPEncodeable): Ping = rlp match {
- case RLPList(version, from, to, timestamp, _*) =>
- Ping(version, Endpoint.rlpEncDec.decode(from), Endpoint.rlpEncDec.decode(to), timestamp)
- case _ => throw new RuntimeException("Cannot decode Ping")
- }
- }
-
-}
-
-case class Ping(version: Int, from: Endpoint, to: Endpoint, timestamp: Long) extends Message {
- override val packetType: Byte = Ping.packetType
-}
-
-object Neighbours {
- val packetType: Byte = 0x04
-
- implicit val rlpEncDec = new RLPEncoder[Neighbours] with RLPDecoder[Neighbours] {
- override def encode(obj: Neighbours): RLPEncodeable = {
- import obj._
- RLPList(RLPList(nodes.map(Neighbour.rlpEncDec.encode): _*), expires)
- }
-
- override def decode(rlp: RLPEncodeable): Neighbours = rlp match {
- case RLPList(nodes: RLPList, expires, _*) =>
- Neighbours(nodes.items.map(Neighbour.rlpEncDec.decode), expires)
- case _ => throw new RuntimeException("Cannot decode Neighbours")
- }
- }
-}
-
-case class Neighbours(nodes: Seq[Neighbour], expires: Long) extends Message {
- override val packetType: Byte = Neighbours.packetType
-}
-
-object Neighbour {
- implicit val rlpEncDec = new RLPEncoder[Neighbour] with RLPDecoder[Neighbour] {
- override def encode(obj: Neighbour): RLPEncodeable = {
- import obj._
- RLPList(endpoint.address, endpoint.udpPort, endpoint.tcpPort, nodeId)
- }
-
- override def decode(rlp: RLPEncodeable): Neighbour = rlp match {
- case RLPList(address, udpPort, tcpPort, nodeId, _*) =>
- Neighbour(Endpoint(address, udpPort, tcpPort), nodeId)
- case _ => throw new RuntimeException("Cannot decode Neighbour")
- }
- }
-}
-
-case class Neighbour(endpoint: Endpoint, nodeId: ByteString)
-
-case class FindNode(target: ByteString, expires: Long) extends Message {
- override val packetType: Byte = FindNode.packetType
-}
-
-object FindNode {
- val packetType: Byte = 0x03
-
- implicit val rlpEncDec = new RLPEncoder[FindNode] with RLPDecoder[FindNode] {
- override def encode(obj: FindNode): RLPEncodeable = {
- import obj._
- RLPList(target, expires)
- }
-
- override def decode(rlp: RLPEncodeable): FindNode = rlp match {
- case RLPList(target, expires, _*) =>
- FindNode(target, expires)
- case _ => throw new RuntimeException("Cannot decode FindNode")
- }
- }
-}
-
-object Pong {
- val packetType: Byte = 0x02
-
- implicit val rlpEncDec = new RLPEncoder[Pong] with RLPDecoder[Pong] {
- override def encode(obj: Pong): RLPEncodeable = {
- import obj._
- RLPList(to, token, timestamp)
- }
-
- override def decode(rlp: RLPEncodeable): Pong = rlp match {
- case RLPList(to, token, timestamp, _*) =>
- Pong(Endpoint.rlpEncDec.decode(to), token, timestamp)
- case _ => throw new RuntimeException("Cannot decode Pong")
- }
- }
-}
-
-case class Pong(to: Endpoint, token: ByteString, timestamp: Long) extends Message {
- override val packetType: Byte = Pong.packetType
-}
diff --git a/src/main/scala/io/iohk/ethereum/network/discovery/package.scala b/src/main/scala/io/iohk/ethereum/network/discovery/package.scala
deleted file mode 100644
index 409bf40612..0000000000
--- a/src/main/scala/io/iohk/ethereum/network/discovery/package.scala
+++ /dev/null
@@ -1,85 +0,0 @@
-package io.iohk.ethereum.network
-
-import akka.util.ByteString
-import io.iohk.ethereum.crypto.ECDSASignature
-import io.iohk.ethereum.{crypto, rlp}
-import io.iohk.ethereum.rlp.RLPEncoder
-import org.bouncycastle.crypto.AsymmetricCipherKeyPair
-import org.bouncycastle.util.BigIntegers
-import scala.util.{Failure, Success, Try}
-
-package object discovery {
-
- object Packet {
- private val MdcLength = 32
- private val PacketTypeByteIndex = MdcLength + ECDSASignature.EncodedLength
- private val DataOffset = PacketTypeByteIndex + 1
- }
-
- case class Packet(wire: ByteString) {
- import Packet._
-
- lazy val nodeId: Option[ByteString] = {
- val msgHash = crypto.kec256(wire.drop(MdcLength + ECDSASignature.EncodedLength))
- signature.publicKey(msgHash.toArray[Byte], None).map(ByteString.apply)
- }
- def validated(): Option[Packet] = nodeId.map(_ => this)
-
- def data: ByteString = wire.drop(DataOffset)
-
- def packetType: Byte = wire(PacketTypeByteIndex)
-
- def mdc: ByteString = wire.take(MdcLength)
-
- def signature: ECDSASignature = {
- val signatureBytes = wire.drop(MdcLength).take(ECDSASignature.EncodedLength)
- val r = signatureBytes.take(ECDSASignature.RLength)
- val s = signatureBytes.drop(ECDSASignature.RLength).take(ECDSASignature.SLength)
- val v = (signatureBytes.last + 27).toByte
-
- ECDSASignature(r, s, v)
- }
- }
-
- private[discovery] def encodePacket[M <: Message](msg: M, keyPair: AsymmetricCipherKeyPair)(implicit
- rlpEnc: RLPEncoder[M]
- ): ByteString = {
- val encodedData = rlp.encode(msg)
-
- val payload = Array(msg.packetType) ++ encodedData
- val forSig = crypto.kec256(payload)
- val signature = ECDSASignature.sign(forSig, keyPair, None)
-
- val sigBytes =
- BigIntegers.asUnsignedByteArray(32, signature.r.bigInteger) ++
- BigIntegers.asUnsignedByteArray(32, signature.s.bigInteger) ++
- Array[Byte]((signature.v - 27).toByte)
-
- val forSha = sigBytes ++ Array(msg.packetType) ++ encodedData
- val mdc = crypto.kec256(forSha)
-
- ByteString(mdc ++ sigBytes ++ Array(msg.packetType) ++ encodedData)
- }
-
- private[discovery] def extractMessage(packet: Packet): Try[Message] = Try {
- packet.packetType match {
- case Ping.packetType => rlp.decode[Ping](packet.data.toArray[Byte])
- case Pong.packetType => rlp.decode[Pong](packet.data.toArray[Byte])
- case FindNode.packetType => rlp.decode[FindNode](packet.data.toArray[Byte])
- case Neighbours.packetType => rlp.decode[Neighbours](packet.data.toArray[Byte])
- case _ => throw new RuntimeException(s"Unknown packet type ${packet.packetType}")
- }
- }
-
- private[discovery] def decodePacket(input: ByteString): Try[Packet] = {
- if (input.length < 98) {
- Failure(new RuntimeException("Bad message"))
- } else {
- val packet = Packet(input).validated()
- packet.collect {
- case p if p.mdc == crypto.kec256(input.drop(32)) => Success(p)
- } getOrElse Failure(new RuntimeException("MDC check failed"))
- }
- }
-
-}
diff --git a/src/main/scala/io/iohk/ethereum/nodebuilder/NodeBuilder.scala b/src/main/scala/io/iohk/ethereum/nodebuilder/NodeBuilder.scala
index f4b56eab83..d4c19b32e9 100644
--- a/src/main/scala/io/iohk/ethereum/nodebuilder/NodeBuilder.scala
+++ b/src/main/scala/io/iohk/ethereum/nodebuilder/NodeBuilder.scala
@@ -1,7 +1,6 @@
package io.iohk.ethereum.nodebuilder
import java.security.SecureRandom
-import java.time.Clock
import java.util.concurrent.atomic.AtomicReference
import akka.actor.{ActorRef, ActorSystem}
@@ -25,7 +24,7 @@ import io.iohk.ethereum.ledger.Ledger.VMImpl
import io.iohk.ethereum.ledger._
import io.iohk.ethereum.network.EtcPeerManagerActor.PeerInfo
import io.iohk.ethereum.network.PeerManagerActor.PeerConfiguration
-import io.iohk.ethereum.network.discovery.{DiscoveryConfig, DiscoveryListener, PeerDiscoveryManager}
+import io.iohk.ethereum.network.discovery.{DiscoveryConfig, PeerDiscoveryManager, DiscoveryServiceBuilder}
import io.iohk.ethereum.network.handshaker.{EtcHandshaker, EtcHandshakerConfiguration, Handshaker}
import io.iohk.ethereum.network.p2p.EthereumMessageDecoder
import io.iohk.ethereum.network.rlpx.AuthHandshaker
@@ -35,11 +34,14 @@ import io.iohk.ethereum.testmode.{TestLedgerBuilder, TestmodeConsensusBuilder}
import io.iohk.ethereum.transactions.PendingTransactionsManager
import io.iohk.ethereum.utils.Config.SyncConfig
import io.iohk.ethereum.utils._
+import java.security.SecureRandom
+import java.util.concurrent.atomic.AtomicReference
+import io.iohk.ethereum.consensus.blocks.CheckpointBlockGenerator
import org.bouncycastle.crypto.AsymmetricCipherKeyPair
-
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
import scala.util.{Failure, Success, Try}
+import akka.util.ByteString
// scalastyle:off number.of.types
trait BlockchainConfigBuilder {
@@ -107,30 +109,24 @@ trait KnownNodesManagerBuilder {
trait PeerDiscoveryManagerBuilder {
self: ActorSystemBuilder
- with DiscoveryListenerBuilder
with NodeStatusBuilder
with DiscoveryConfigBuilder
+ with DiscoveryServiceBuilder
with StorageBuilder =>
+ import monix.execution.Scheduler.Implicits.global
+
lazy val peerDiscoveryManager: ActorRef = system.actorOf(
PeerDiscoveryManager.props(
- discoveryListener,
+ localNodeId = ByteString(nodeStatusHolder.get.nodeId),
discoveryConfig,
storagesInstance.storages.knownNodesStorage,
- nodeStatusHolder,
- Clock.systemUTC()
+ discoveryServiceResource(discoveryConfig, tcpPort = Config.Network.Server.port, nodeStatusHolder)
),
"peer-discovery-manager"
)
}
-trait DiscoveryListenerBuilder {
- self: ActorSystemBuilder with DiscoveryConfigBuilder with NodeStatusBuilder =>
-
- lazy val discoveryListener: ActorRef =
- system.actorOf(DiscoveryListener.props(discoveryConfig, nodeStatusHolder), "discovery-listener")
-}
-
trait NodeStatusBuilder {
self: NodeKeyBuilder =>
@@ -658,8 +654,8 @@ trait Node
with AuthHandshakerBuilder
with PruningConfigBuilder
with PeerDiscoveryManagerBuilder
+ with DiscoveryServiceBuilder
with DiscoveryConfigBuilder
- with DiscoveryListenerBuilder
with KnownNodesManagerBuilder
with SyncConfigBuilder
with VmBuilder
diff --git a/src/main/scala/io/iohk/ethereum/nodebuilder/StdNode.scala b/src/main/scala/io/iohk/ethereum/nodebuilder/StdNode.scala
index babcefbbb1..12428bde0a 100644
--- a/src/main/scala/io/iohk/ethereum/nodebuilder/StdNode.scala
+++ b/src/main/scala/io/iohk/ethereum/nodebuilder/StdNode.scala
@@ -3,7 +3,7 @@ package io.iohk.ethereum.nodebuilder
import io.iohk.ethereum.blockchain.sync.SyncProtocol
import io.iohk.ethereum.consensus.StdConsensusBuilder
import io.iohk.ethereum.metrics.{Metrics, MetricsConfig}
-import io.iohk.ethereum.network.discovery.DiscoveryListener
+import io.iohk.ethereum.network.discovery.PeerDiscoveryManager
import io.iohk.ethereum.network.{PeerManagerActor, ServerActor}
import io.iohk.ethereum.testmode.{TestLedgerBuilder, TestmodeConsensusBuilder}
import io.iohk.ethereum.utils.Config
@@ -29,16 +29,11 @@ abstract class BaseNode extends Node {
private[this] def startServer(): Unit = server ! ServerActor.StartServer(networkConfig.Server.listenAddress)
- private[this] def startDiscoveryListener(): Unit =
- if (discoveryConfig.discoveryEnabled) {
- discoveryListener ! DiscoveryListener.Start
- }
-
private[this] def startSyncController(): Unit = syncController ! SyncProtocol.Start
private[this] def startConsensus(): Unit = consensus.startProtocol(this)
- private[this] def startDiscoveryManager(): Unit = peerDiscoveryManager // unlazy
+ private[this] def startDiscoveryManager(): Unit = peerDiscoveryManager ! PeerDiscoveryManager.Start
private[this] def startJsonRpcHttpServer(): Unit =
maybeJsonRpcHttpServer match {
@@ -69,8 +64,6 @@ abstract class BaseNode extends Node {
startServer()
- startDiscoveryListener()
-
startSyncController()
startConsensus()
@@ -87,6 +80,7 @@ abstract class BaseNode extends Node {
case Success(_) =>
}
+ tryAndLogFailure(() => peerDiscoveryManager ! PeerDiscoveryManager.Stop)
tryAndLogFailure(() => consensus.stopProtocol())
tryAndLogFailure(() =>
Await.ready(
diff --git a/src/main/scala/io/iohk/ethereum/rlp/RLPImplicitDerivations.scala b/src/main/scala/io/iohk/ethereum/rlp/RLPImplicitDerivations.scala
index 6cb33889c3..c445a5e9e3 100644
--- a/src/main/scala/io/iohk/ethereum/rlp/RLPImplicitDerivations.scala
+++ b/src/main/scala/io/iohk/ethereum/rlp/RLPImplicitDerivations.scala
@@ -15,7 +15,7 @@ object RLPImplicitDerivations {
omitTrailingOptionals: Boolean
)
object DerivationPolicy {
- val default = DerivationPolicy(omitTrailingOptionals = true)
+ val default = DerivationPolicy(omitTrailingOptionals = false)
}
/** Support introspecting on what happened during encoding the tail. */
@@ -59,21 +59,6 @@ object RLPImplicitDerivations {
}
}
- private def decodeError[T](subject: String, error: String, maybeEncodeable: Option[RLPEncodeable] = None): T =
- throw RLPException(s"Cannot decode $subject: $error", maybeEncodeable)
-
- private def tryDecode[T](subject: => String, encodeable: RLPEncodeable)(f: RLPEncodeable => T): T = {
- try {
- f(encodeable)
- } catch {
- case ex: RLPException =>
- // Preserve the original encodeable if there is one.
- decodeError(subject, ex.message, ex.encodeable orElse Some(encodeable))
- case NonFatal(ex) =>
- decodeError(subject, ex.getMessage, Some(encodeable))
- }
- }
-
/** Encoder for the empty list of fields. */
implicit val deriveHNilRLPListEncoder: RLPListEncoder[HNil] =
RLPListEncoder(_ => RLPList() -> Nil)
@@ -146,8 +131,18 @@ object RLPImplicitDerivations {
* We can ignore extra items in the RLPList as optional fields we don't handle,
* or extra random data, which we have for example in EIP8 test vectors.
*/
- implicit val deriveHNilRLPListDecoder: RLPListDecoder[HNil] =
- RLPListDecoder(_ => HNil -> Nil)
+ implicit def deriveHNilRLPListDecoder(implicit
+ policy: DerivationPolicy = DerivationPolicy.default
+ ): RLPListDecoder[HNil] =
+ RLPListDecoder {
+ case Nil => HNil -> Nil
+ case _ if policy.omitTrailingOptionals => HNil -> Nil
+ case items =>
+ throw RLPException(
+ s"Unexpected items at the end of the RLPList: ${items.size} leftover items.",
+ RLPList(items: _*)
+ )
+ }
/** Decoder for a list of fields in the generic represenation of a case class.
*
@@ -176,7 +171,7 @@ object RLPImplicitDerivations {
(head :: tail) -> (hInfo :: tInfos)
case Nil =>
- decodeError(subject, "RLPList is empty.")
+ RLPException.decodeError(subject, "RLPList is empty.")
case rlps =>
val (tail, tInfos) = tDecoder.value.decodeList(rlps.tail)
@@ -216,7 +211,7 @@ object RLPImplicitDerivations {
RLPListDecoder {
case Nil =>
- decodeError(subject, "RLPList is empty.")
+ RLPException.decodeError(subject, "RLPList is empty.")
case rlps =>
val value: H =
diff --git a/src/main/scala/io/iohk/ethereum/rlp/package.scala b/src/main/scala/io/iohk/ethereum/rlp/package.scala
index 8ddefce2dc..eeb1c3ccb5 100644
--- a/src/main/scala/io/iohk/ethereum/rlp/package.scala
+++ b/src/main/scala/io/iohk/ethereum/rlp/package.scala
@@ -3,16 +3,29 @@ package io.iohk.ethereum
import akka.util.ByteString
import org.bouncycastle.util.encoders.Hex
import scala.reflect.ClassTag
+import scala.util.control.NonFatal
package object rlp {
- case class RLPException(message: String, encodeable: Option[RLPEncodeable] = None) extends RuntimeException(message)
+ /** An exception capturing a deserialization error.
+ *
+ * The `encodeables` are a stack of values as we recursed into the data structure
+ * which may help deducting what went wrong. The last element is what caused the
+ * problem but it may be easier to recognise if we look at the head.
+ */
+ case class RLPException(message: String, encodeables: List[RLPEncodeable] = Nil) extends RuntimeException(message)
object RLPException {
def apply(message: String, encodeable: RLPEncodeable): RLPException =
- RLPException(message, Some(encodeable))
+ RLPException(message, List(encodeable))
+
+ def decodeError[T](subject: String, error: String, encodeables: List[RLPEncodeable] = Nil): T =
+ throw RLPException(s"Cannot decode $subject: $error", encodeables)
}
- sealed trait RLPEncodeable
+ sealed trait RLPEncodeable {
+ def decodeAs[T: RLPDecoder](subject: => String): T =
+ tryDecode[T](subject, this)(RLPDecoder[T].decode)
+ }
case class RLPList(items: RLPEncodeable*) extends RLPEncodeable {
def +:(item: RLPEncodeable): RLPList =
@@ -69,6 +82,17 @@ package object rlp {
def rawDecode(input: Array[Byte]): RLPEncodeable = RLP.rawDecode(input)
+ def tryDecode[T](subject: => String, encodeable: RLPEncodeable)(f: RLPEncodeable => T): T = {
+ try {
+ f(encodeable)
+ } catch {
+ case RLPException(message, encodeables) =>
+ RLPException.decodeError(subject, message, encodeable :: encodeables)
+ case NonFatal(ex) =>
+ RLPException.decodeError(subject, ex.getMessage, List(encodeable))
+ }
+ }
+
/**
* This function calculates the next element item based on a previous element starting position. It's meant to be
* used while decoding a stream of RLPEncoded Items.
@@ -99,7 +123,7 @@ package object rlp {
override def decode(rlp: RLPEncodeable): T =
if (dec.isDefinedAt(rlp)) dec(rlp)
- else throw RLPException(s"Cannot decode type ${ct.runtimeClass.getSimpleName} from unexpected RLP.", rlp)
+ else RLPException.decodeError(s"type ${ct.runtimeClass.getSimpleName}", "Unexpected RLP.", List(rlp))
}
def apply[T](enc: RLPEncoder[T], dec: RLPDecoder[T]): RLPCodec[T] =
diff --git a/src/main/scala/io/iohk/ethereum/utils/BlockchainConfig.scala b/src/main/scala/io/iohk/ethereum/utils/BlockchainConfig.scala
index fbfa8f671b..ac8f0751e9 100644
--- a/src/main/scala/io/iohk/ethereum/utils/BlockchainConfig.scala
+++ b/src/main/scala/io/iohk/ethereum/utils/BlockchainConfig.scala
@@ -52,7 +52,8 @@ object BlockchainConfig {
def fromRawConfig(blockchainConfig: TypesafeConfig): BlockchainConfig = {
val powTargetTime: Option[Long] =
ConfigUtils
- .getOptionalValue(blockchainConfig, "pow-target-time", _.getDuration("pow-target-time").getSeconds)
+ .getOptionalValue(blockchainConfig, _.getDuration, "pow-target-time")
+ .map(_.getSeconds)
val frontierBlockNumber: BigInt = BigInt(blockchainConfig.getString("frontier-block-number"))
val homesteadBlockNumber: BigInt = BigInt(blockchainConfig.getString("homestead-block-number"))
val eip106BlockNumber: BigInt = BigInt(blockchainConfig.getString("eip106-block-number"))
@@ -146,11 +147,7 @@ object BlockchainConfig {
// scalastyle:on method.length
private def readCheckpointPubKeys(blockchainConfig: TypesafeConfig): Set[ByteString] = {
val keys: Seq[String] = ConfigUtils
- .getOptionalValue(
- blockchainConfig,
- "checkpoint-public-keys",
- config => config.getStringList("checkpoint-public-keys")
- )
+ .getOptionalValue(blockchainConfig, _.getStringList, "checkpoint-public-keys")
.map(_.asScala)
.getOrElse(Seq.empty)
keys.map(ByteStringUtils.string2hash).toSet
diff --git a/src/main/scala/io/iohk/ethereum/utils/ConfigUtils.scala b/src/main/scala/io/iohk/ethereum/utils/ConfigUtils.scala
index d5c2af02b7..e3e6ecf6e4 100644
--- a/src/main/scala/io/iohk/ethereum/utils/ConfigUtils.scala
+++ b/src/main/scala/io/iohk/ethereum/utils/ConfigUtils.scala
@@ -22,8 +22,8 @@ object ConfigUtils {
case s => HttpOriginMatcher.Default(HttpOrigin(s) :: Nil)
}
- def getOptionalValue[V](config: TypesafeConfig, path: String, getter: TypesafeConfig => V): Option[V] =
- if (config.hasPath(path)) Some(getter(config))
+ def getOptionalValue[V](config: TypesafeConfig, getter: TypesafeConfig => String => V, path: String): Option[V] =
+ if (config.hasPath(path)) Some(getter(config)(path))
else None
def keys(config: TypesafeConfig): Set[String] = config
diff --git a/src/test/scala/io/iohk/ethereum/network/PeerManagerSpec.scala b/src/test/scala/io/iohk/ethereum/network/PeerManagerSpec.scala
index 54ac0ff524..0b67104de4 100644
--- a/src/test/scala/io/iohk/ethereum/network/PeerManagerSpec.scala
+++ b/src/test/scala/io/iohk/ethereum/network/PeerManagerSpec.scala
@@ -13,7 +13,7 @@ import io.iohk.ethereum.network.PeerEventBusActor.PeerEvent.PeerDisconnected
import io.iohk.ethereum.network.PeerEventBusActor.SubscriptionClassifier.PeerHandshaked
import io.iohk.ethereum.network.PeerEventBusActor.{PeerEvent, Publish, Subscribe}
import io.iohk.ethereum.network.PeerManagerActor.{GetPeers, PeerConfiguration, Peers, SendMessage}
-import io.iohk.ethereum.network.discovery.{DiscoveryConfig, PeerDiscoveryManager}
+import io.iohk.ethereum.network.discovery.{DiscoveryConfig, PeerDiscoveryManager, Node}
import io.iohk.ethereum.network.p2p.messages.CommonMessages.{NewBlock, Status}
import io.iohk.ethereum.network.p2p.messages.Versions
import io.iohk.ethereum.network.p2p.messages.WireProtocol.Disconnect
@@ -249,9 +249,9 @@ class PeerManagerSpec
val peerEventBus = TestProbe()
val knownNodesManager = TestProbe()
- val bootstrapNodes: Set[PeerDiscoveryManager.DiscoveryNodeInfo] =
+ val bootstrapNodes: Set[Node] =
DiscoveryConfig(Config.config, Config.blockchains.blockchainConfig.bootstrapNodes).bootstrapNodes
- .map(PeerDiscoveryManager.DiscoveryNodeInfo.fromNode)
+
val knownNodes: Set[URI] = Set.empty
val peerFactory: (ActorContext, InetSocketAddress, Boolean) => ActorRef = { (_, address, isIncoming) =>
diff --git a/src/test/scala/io/iohk/ethereum/network/discovery/PeerDiscoveryManagerSpec.scala b/src/test/scala/io/iohk/ethereum/network/discovery/PeerDiscoveryManagerSpec.scala
index 248b09b2c4..0b961c1de0 100644
--- a/src/test/scala/io/iohk/ethereum/network/discovery/PeerDiscoveryManagerSpec.scala
+++ b/src/test/scala/io/iohk/ethereum/network/discovery/PeerDiscoveryManagerSpec.scala
@@ -1,215 +1,230 @@
package io.iohk.ethereum.network.discovery
-import java.net.{InetAddress, InetSocketAddress}
-import java.time.{Clock, Instant, ZoneId}
+import akka.pattern.ask
import akka.actor.ActorSystem
-import akka.testkit.{TestActorRef, TestProbe}
-import akka.util.ByteString
-import com.miguno.akka.testing.VirtualTime
-import io.iohk.ethereum.NormalPatience
-import io.iohk.ethereum.blockchain.sync.EphemBlockchainTestSetup
-import io.iohk.ethereum.network.discovery.DiscoveryListener._
-import io.iohk.ethereum.network.discovery.PeerDiscoveryManager.{DiscoveryNodeInfo, PingInfo}
-import io.iohk.ethereum.nodebuilder.{NodeKeyBuilder, SecureRandomBuilder}
-import io.iohk.ethereum.rlp.RLPEncoder
-import io.iohk.ethereum.utils.{Config, NodeStatus, ServerStatus}
-import java.util.concurrent.atomic.AtomicReference
-import org.scalamock.scalatest.MockFactory
-import org.scalatest.concurrent.ScalaFutures
-import scala.util.Success
-import org.scalatest.flatspec.AnyFlatSpec
+import akka.testkit.{TestActorRef, TestKit}
+import akka.util.{ByteString, Timeout}
+import cats.effect.Resource
+import io.iohk.ethereum.utils.Config
+import io.iohk.ethereum.db.storage.KnownNodesStorage
+import io.iohk.scalanet.discovery.crypto.PublicKey
+import io.iohk.scalanet.discovery.ethereum.v4.DiscoveryService
+import io.iohk.scalanet.discovery.ethereum.{Node => ENode}
+import monix.eval.Task
+import monix.execution.Scheduler
+import org.scalatest.concurrent.Eventually
+import org.scalatest.flatspec.AnyFlatSpecLike
import org.scalatest.matchers.should.Matchers
+import org.scalatest.concurrent.ScalaFutures
+import org.scalamock.scalatest.MockFactory
+import scala.concurrent.duration._
+import scala.concurrent.Await
+import scodec.bits.BitVector
+import scala.util.control.NoStackTrace
+import io.iohk.ethereum.NormalPatience
class PeerDiscoveryManagerSpec
- extends AnyFlatSpec
+ extends TestKit(ActorSystem("PeerDiscoveryManagerSpec_System"))
+ with AnyFlatSpecLike
with Matchers
+ with Eventually
with MockFactory
with ScalaFutures
with NormalPatience {
- it should "correctly respond to Ping Message" in new TestSetup {
- val pingMessageReceived = MessageReceived(ping, remoteUdpAddress, pingPingPacketDecoded)
-
- discoveryPeerManager ! pingMessageReceived
-
- val packet =
- Packet(encodePacket(Pong(remoteEndpoint, pingPingPacketDecoded.mdc, expectedTime), nodeStatusHolder.get().key))
+ implicit val scheduler = Scheduler.Implicits.global
+ implicit val timeout: Timeout = 1.second
+
+ val defaultConfig = DiscoveryConfig(Config.config, Set.empty)
+
+ val sampleKnownUris = Set(
+ "enode://a59e33ccd2b3e52d578f1fbd70c6f9babda2650f0760d6ff3b37742fdcdfdb3defba5d56d315b40c46b70198c7621e63ffa3f987389c7118634b0fefbbdfa7fd@51.158.191.43:38556?discport=38556",
+ "enode://651b484b652c07c72adebfaaf8bc2bd95b420b16952ef3de76a9c00ef63f07cca02a20bd2363426f9e6fe372cef96a42b0fec3c747d118f79fd5e02f2a4ebd4e@51.158.190.99:45678?discport=45678",
+ "enode://9b1bf9613d859ac2071d88509ab40a111b75c1cfc51f4ad78a1fdbb429ff2405de0dc5ea8ae75e6ac88e03e51a465f0b27b517e78517f7220ae163a2e0692991@51.158.190.99:30426?discport=30426"
+ ).map(new java.net.URI(_))
+
+ val sampleBootstrapNodes = Set(
+ "enode://111bd28d5b2c1378d748383fd83ff59572967c317c3063a9f475a26ad3f1517642a164338fb5268d4e32ea1cc48e663bd627dec572f1d201c7198518e5a506b1@88.99.216.30:45834?discport=45834",
+ "enode://2b69a3926f36a7748c9021c34050be5e0b64346225e477fe7377070f6289bd363b2be73a06010fd516e6ea3ee90778dd0399bc007bb1281923a79374f842675a@51.15.116.226:30303?discport=30303"
+ ).map(new java.net.URI(_)).map(Node.fromUri)
+
+ trait Fixture {
+ lazy val discoveryConfig = defaultConfig
+ lazy val knownNodesStorage = mock[KnownNodesStorage]
+ lazy val discoveryService = mock[DiscoveryService]
+ lazy val discoveryServiceResource = Resource.pure[Task, DiscoveryService](discoveryService)
+
+ lazy val peerDiscoveryManager =
+ TestActorRef[PeerDiscoveryManager](
+ PeerDiscoveryManager.props(
+ localNodeId = ByteString.fromString("test-node"),
+ discoveryConfig = discoveryConfig,
+ knownNodesStorage = knownNodesStorage,
+ discoveryServiceResource = discoveryServiceResource
+ )
+ )
- val expectedPongResponse = SendPacket(packet, remoteUdpAddress)
+ def getPeers =
+ (peerDiscoveryManager ? PeerDiscoveryManager.GetDiscoveredNodesInfo)
+ .mapTo[PeerDiscoveryManager.DiscoveredNodesInfo]
- discoveryListner.expectMsg(expectedPongResponse)
+ def test(): Unit
}
- it should "correctly respond to Pong Message" in new TestSetup {
- val pong = Pong(toEndpoint, pingPingPacketDecoded.mdc, timestamp)
- val pongDecoded = getPacket(pong)
- val pongMessageReceiced = MessageReceived(pong, remoteUdpAddress, pongDecoded)
-
- pongDecoded.validated().isDefined should be(true)
-
- val nodeInfo = DiscoveryNodeInfo.fromNode(
- Node(pongDecoded.nodeId.get, remoteUdpAddress.getAddress, remoteUdpPort, remoteUdpPort)
- )
-
- discoveryPeerManager.underlyingActor.pingedNodes += pingPingPacketDecoded.mdc -> PingInfo(nodeInfo, timestamp)
-
- discoveryPeerManager ! pongMessageReceiced
-
- val expectedFindNodeResponse =
- SendPacket(
- Packet(encodePacket(FindNode(ByteString(nodeStatus.nodeId), expectedTime), nodeStatusHolder.get().key)),
- remoteUdpAddress
- )
-
- Thread.sleep(1500)
- discoveryListner.expectMsg(expectedFindNodeResponse)
- discoveryPeerManager.underlyingActor.nodesInfo.size shouldEqual 3 // 2 bootstraps + 1 new node
- discoveryPeerManager.underlyingActor.nodesInfo.values.toSet should contain(nodeInfo.copy(addTimestamp = 0))
+ def test(fixture: Fixture): Unit = {
+ try {
+ fixture.test()
+ } finally {
+ system.stop(fixture.peerDiscoveryManager)
+ }
}
- it should "correctly respond to FindNode Message" in new TestSetup {
- val findNode = FindNode(ByteString.empty, timestamp)
- val findeNodeDecoded = getPacket(findNode)
- val findNodeMessageReceived = MessageReceived(findNode, remoteUdpAddress, findeNodeDecoded)
-
- discoveryPeerManager ! findNodeMessageReceived
-
- val expectedFindNodeResponse = (
- Packet(encodePacket(Neighbours(bootNeighbours, expectedTime), nodeStatusHolder.get().key)),
- remoteUdpAddress
+ def toENode(node: Node): ENode =
+ ENode(
+ id = PublicKey(BitVector(node.id.toArray[Byte])),
+ address = ENode.Address(ip = node.addr, udpPort = node.udpPort, tcpPort = node.tcpPort)
)
- val r = discoveryListner.expectMsgType[SendPacket]
+ behavior of "PeerDiscoveryManager"
+
+ it should "serve no peers if discovery is disabled and known peers are disabled and the manager isn't started" in test {
+ new Fixture {
+ override lazy val discoveryConfig =
+ defaultConfig.copy(discoveryEnabled = false, reuseKnownNodes = false, bootstrapNodes = Set.empty)
- extractMessage(r.packet) match {
- case Success(Neighbours(received, _)) =>
- received should contain theSameElementsAs bootNeighbours
- case _ => fail("Wrong message")
+ override def test(): Unit = {
+ getPeers.futureValue.nodes shouldBe empty
+ }
}
}
- it should "correctly respond to neighbours Message" in new TestSetup {
- val neighboursm = Neighbours(neighbours, timestamp)
- val neighboursDecoded = getPacket(neighboursm)
- val neighboursMessageReceived = MessageReceived(neighboursm, remoteUdpAddress, neighboursDecoded)
-
- discoveryPeerManager ! neighboursMessageReceived
-
- expectedMes.foreach(mess => discoveryListner.expectMsg(mess))
- // necessery doubling because of different pong validations in parity and geth
- discoveryPeerManager.underlyingActor.pingedNodes.size shouldEqual (neighbours.size * 2)
- }
+ it should "serve the bootstrap nodes if known peers are reused even discovery isn't enabled and the manager isn't started" in test {
+ new Fixture {
+ override lazy val discoveryConfig =
+ defaultConfig.copy(discoveryEnabled = false, reuseKnownNodes = true, bootstrapNodes = sampleBootstrapNodes)
- it should "correctly scan bootstrap nodes" in new TestSetup {
- discoveryPeerManager ! PeerDiscoveryManager.Scan
- Thread.sleep(500)
- // necessery doubling because of different pong validations in parity and geth
- discoveryPeerManager.underlyingActor.pingedNodes.size shouldEqual (bootstrapNodes.size * 2)
- expectedBootStrapPings.foreach(mes => discoveryListner.expectMsg(mes._2))
+ override def test(): Unit = {
+ getPeers.futureValue.nodes should contain theSameElementsAs (sampleBootstrapNodes)
+ }
+ }
}
- // scalastyle:off magic.number
- trait TestSetup extends MockFactory with SecureRandomBuilder with NodeKeyBuilder with EphemBlockchainTestSetup {
-
- import DiscoveryListener._
+ it should "serve the known peers if discovery is enabled and the manager isn't started" in test {
+ new Fixture {
+ override lazy val discoveryConfig =
+ defaultConfig.copy(discoveryEnabled = true, reuseKnownNodes = true, bootstrapNodes = Set.empty)
- override implicit lazy val system = ActorSystem("DiscoverySpec_System")
- val time = new VirtualTime
+ (knownNodesStorage.getKnownNodes _)
+ .expects()
+ .returning(sampleKnownUris)
+ .once()
- def getPacket[M <: Message](m: M)(implicit rlpEnc: RLPEncoder[M]): Packet = {
- val encoded = encodePacket(m, nodeKey)
- decodePacket(encoded).get
+ override def test(): Unit = {
+ getPeers.futureValue.nodes.map(_.toUri) should contain theSameElementsAs (sampleKnownUris)
+ }
}
+ }
- val discoveryConfig = DiscoveryConfig(Config.config, Config.blockchains.blockchainConfig.bootstrapNodes)
-
- val bootstrapNodes = discoveryConfig.bootstrapNodes.map(DiscoveryNodeInfo.fromNode).toSeq
-
- val bootNeighbours = bootstrapNodes
- .map(node => Neighbour(Endpoint.makeEndpoint(node.node.udpSocketAddress, node.node.tcpPort), node.node.id))
- .toList
-
- val expTimeSec = discoveryConfig.messageExpiration.toSeconds
- val discoveryListner = TestProbe()
-
- val address = InetAddress.getLocalHost
- val port = 30303
- val localAddress = new InetSocketAddress(address, port)
+ it should "merge the known peers with the service if it's started" in test {
+ new Fixture {
+ override lazy val discoveryConfig =
+ defaultConfig.copy(discoveryEnabled = true, reuseKnownNodes = true, bootstrapNodes = Set.empty)
- val remoteAddr = "31.178.1.7"
- val remoteUdpPort = 30303
- val remoteTcpPort = 9076
- val remoteUdpAddress = new InetSocketAddress(remoteAddr, remoteUdpPort)
+ val sampleNodes = sampleBootstrapNodes
- val nodeStatus =
- NodeStatus(
- key = nodeKey,
- serverStatus = ServerStatus.Listening(localAddress),
- discoveryStatus = ServerStatus.Listening(localAddress)
- )
+ (knownNodesStorage.getKnownNodes _)
+ .expects()
+ .returning(sampleKnownUris)
+ .once()
- val nodeStatusHolder = new AtomicReference(nodeStatus)
- val fakeClock = Clock.fixed(Instant.ofEpochSecond(0), ZoneId.systemDefault())
- val discoveryPeerManager = TestActorRef[PeerDiscoveryManager](
- PeerDiscoveryManager.props(
- discoveryListner.ref,
- discoveryConfig,
- storagesInstance.storages.knownNodesStorage,
- nodeStatusHolder,
- fakeClock
- )
- )
+ (discoveryService.getNodes _)
+ .expects()
+ .returning(Task(sampleNodes.map(toENode)))
+ .once()
- val expectedTime = fakeClock.instant().plusSeconds(expTimeSec).getEpochSecond
- discoveryListner.expectMsg(Subscribe)
+ val expected = sampleKnownUris ++ sampleNodes.map(_.toUri)
- val version = 4
- val toEndpoint = Endpoint.makeEndpoint(localAddress, port)
- val remoteEndpoint = Endpoint.makeEndpoint(remoteUdpAddress, remoteTcpPort)
- val timestamp = Long.MaxValue
-
- val ping = Ping(version, remoteEndpoint, toEndpoint, timestamp)
- val pingPingPacketDecoded = getPacket(ping)
+ override def test(): Unit = {
+ peerDiscoveryManager ! PeerDiscoveryManager.Start
+ eventually {
+ getPeers.futureValue.nodes.map(_.toUri) should contain theSameElementsAs (expected)
+ }
+ }
+ }
+ }
- val neighboursCount = 9
- val neighbours = (1 to 9).map { n =>
- val newAddress: Array[Byte] = Array(31, 178, 1, n).map(_.toByte)
- val newId: Array[Byte] = Array.fill(64) {
- n.toByte
+ it should "keep serving the known peers if the service fails to start" in test {
+ new Fixture {
+ override lazy val discoveryConfig =
+ defaultConfig.copy(discoveryEnabled = true, reuseKnownNodes = true, bootstrapNodes = Set.empty)
+
+ @volatile var started = false
+
+ override lazy val discoveryServiceResource: Resource[Task, DiscoveryService] =
+ Resource.liftF {
+ Task { started = true } >>
+ Task.raiseError[DiscoveryService](new RuntimeException("Oh no!") with NoStackTrace)
+ }
+
+ (knownNodesStorage.getKnownNodes _)
+ .expects()
+ .returning(sampleKnownUris)
+ .once()
+
+ override def test(): Unit = {
+ peerDiscoveryManager ! PeerDiscoveryManager.Start
+ eventually {
+ started shouldBe true
+ }
+ getPeers.futureValue.nodes should have size (sampleKnownUris.size)
}
- val socketAddress = new InetSocketAddress(InetAddress.getByAddress(newAddress), remoteUdpPort)
- val nodeId = Node(ByteString(newId), InetAddress.getByAddress(newAddress), remoteTcpPort, remoteUdpPort).id
- val neighbourEndpoint = Endpoint.makeEndpoint(socketAddress, remoteTcpPort)
- Neighbour(neighbourEndpoint, nodeId)
- }.toSeq
-
- val expectedMes = neighbours.map { n =>
- val ping = Ping(version, toEndpoint, n.endpoint, expectedTime)
- SendPacket(
- Packet(encodePacket(ping, nodeStatusHolder.get().key)),
- new InetSocketAddress(InetAddress.getByAddress(n.endpoint.address.toArray), n.endpoint.udpPort)
- )
}
+ }
- val expectedBootStrapPings = bootstrapNodes.map { node =>
- (
- node,
- SendPacket(
- Packet(
- encodePacket(
- Ping(
- version,
- Endpoint.makeEndpoint(localAddress, port),
- Endpoint.makeEndpoint(node.node.udpSocketAddress, node.node.tcpPort),
- expectedTime
- ),
- nodeStatusHolder.get().key
- )
- ),
- node.node.udpSocketAddress
- )
- )
+ it should "stop using the service after it is stopped" in test {
+ new Fixture {
+ override lazy val discoveryConfig =
+ defaultConfig.copy(discoveryEnabled = true, reuseKnownNodes = true, bootstrapNodes = Set.empty)
+
+ (knownNodesStorage.getKnownNodes _)
+ .expects()
+ .returning(sampleKnownUris)
+ .once()
+
+ (discoveryService.getNodes _)
+ .expects()
+ .returning(Task(sampleBootstrapNodes.map(toENode)))
+ .once()
+
+ override def test(): Unit = {
+ peerDiscoveryManager ! PeerDiscoveryManager.Start
+ eventually {
+ getPeers.futureValue.nodes should have size (sampleKnownUris.size + sampleBootstrapNodes.size)
+ }
+ peerDiscoveryManager ! PeerDiscoveryManager.Stop
+ eventually {
+ getPeers.futureValue.nodes should have size (sampleKnownUris.size)
+ }
+ }
}
}
+ it should "propagate any error from the service to the caller" in test {
+ new Fixture {
+ override lazy val discoveryConfig =
+ defaultConfig.copy(discoveryEnabled = true, reuseKnownNodes = false, bootstrapNodes = Set.empty)
+
+ (discoveryService.getNodes _)
+ .expects()
+ .returning(Task.raiseError(new RuntimeException("Oh no!") with NoStackTrace))
+ .atLeastOnce()
+
+ override def test(): Unit = {
+ peerDiscoveryManager ! PeerDiscoveryManager.Start
+ eventually {
+ a[RuntimeException] shouldBe thrownBy(Await.result(getPeers, 50.millis))
+ }
+ }
+ }
+ }
}
diff --git a/src/test/scala/io/iohk/ethereum/network/discovery/codecs/EIP8CodecsSpec.scala b/src/test/scala/io/iohk/ethereum/network/discovery/codecs/EIP8CodecsSpec.scala
index b10b6200c0..89e5360557 100644
--- a/src/test/scala/io/iohk/ethereum/network/discovery/codecs/EIP8CodecsSpec.scala
+++ b/src/test/scala/io/iohk/ethereum/network/discovery/codecs/EIP8CodecsSpec.scala
@@ -104,19 +104,6 @@ class EIP8CodecsSpec extends AnyFlatSpec with Matchers {
)
)
- // Test the original Mantis types to make sure we're on the right track.
- EIP8TestVectors.foreach { case EIP8TestVector(description, data, _) =>
- it should s"decode a ${description} into an original Mantis type" in {
- import akka.util.ByteString
- import io.iohk.ethereum.network.discovery.Packet
-
- val bits = BitVector.fromHex(data).get
- val bytes = ByteString(bits.toByteArray)
- val packet = Packet(bytes)
- val message = io.iohk.ethereum.network.discovery.extractMessage(packet).get
- }
- }
-
// Test the RLP decoders in isolation, without crypto.
EIP8TestVectors.foreach { case EIP8TestVector(description, data, test) =>
it should s"decode/encode a ${description}" in {
diff --git a/src/test/scala/io/iohk/ethereum/network/discovery/codecs/ENRCodecsSpec.scala b/src/test/scala/io/iohk/ethereum/network/discovery/codecs/ENRCodecsSpec.scala
index 51f133e646..8d7a19ded6 100644
--- a/src/test/scala/io/iohk/ethereum/network/discovery/codecs/ENRCodecsSpec.scala
+++ b/src/test/scala/io/iohk/ethereum/network/discovery/codecs/ENRCodecsSpec.scala
@@ -3,17 +3,19 @@ package io.iohk.ethereum.network.discovery.codecs
import org.scalatest.matchers.should.Matchers
import org.scalatest.flatspec.AnyFlatSpec
import io.iohk.scalanet.discovery.ethereum.{Node, EthereumNodeRecord}
-import io.iohk.scalanet.discovery.crypto.{SigAlg, PrivateKey}
+import io.iohk.scalanet.discovery.ethereum.v4.Payload.ENRResponse
+import io.iohk.scalanet.discovery.crypto.{SigAlg, PrivateKey, PublicKey}
import io.iohk.scalanet.discovery.hash.{Hash, Keccak256}
import io.iohk.ethereum.network.discovery.Secp256k1SigAlg
import io.iohk.ethereum.rlp
-import io.iohk.ethereum.rlp.{RLPList, RLPEncoder}
+import io.iohk.ethereum.rlp.{RLPList, RLPEncoder, RLPValue}
import io.iohk.ethereum.rlp.RLPImplicits._
import io.iohk.ethereum.rlp.RLPImplicitConversions._
import scodec.bits.{ByteVector, HexStringSyntax}
import java.net.InetAddress
import io.iohk.ethereum.rlp.RLPEncodeable
import io.iohk.ethereum.rlp.RLPDecoder
+import scala.language.implicitConversions
class ENRCodecsSpec extends AnyFlatSpec with Matchers {
@@ -76,7 +78,7 @@ class ENRCodecsSpec extends AnyFlatSpec with Matchers {
// Ignoring the signature, taking items up to where "tcp" would be.
compare(list.items.drop(1).take(7), enrRLP.items.drop(1).take(7))
- // TODO: The example is encoded differently because it uses the minimum
+ // The example is encoded differently because it uses the minimum
// length for the port, whereas the one in Scalanet just converts the
// Int to BigEndian bytes and includes them in the attributes.
//
@@ -129,4 +131,48 @@ class ENRCodecsSpec extends AnyFlatSpec with Matchers {
// should be the 64 byte public key, at least I thought so based on the spec.
Keccak256(publicKey) shouldBe nodeId
}
+
+ it should "handle arbitrary key-value pairs" in {
+ implicit def `BitVector => Array[Byte]`(b: ByteVector): Array[Byte] = b.toArray
+
+ // This is a record returned by one of the nodes on the mordor testnet.
+ val enrResponseWithNonByteValue = RLPList(
+ RLPValue(hex"b800b3f96bc648c5008b3734f591aedfcb26fff3f709ccc43d70de90ed10ab47"),
+ RLPList(
+ RLPValue(
+ hex"3a5b30bccf05526253d5145239c6b07073dfc6747b54a1ae9ee81da7b2ac9caf72e82e90c9d47ca909a1a6348dde6c79cd7df59fd281c246d01a66913551609d"
+ ),
+ RLPValue(hex"31"),
+ // key: "eth"
+ RLPValue(hex"657468"),
+ // not an RLPValue, unlike the rest which are bytes
+ RLPList(
+ RLPList(RLPValue(hex"66b5c286"), RLPValue(Array.empty))
+ ),
+ RLPValue(hex"6964"),
+ RLPValue(hex"7634"),
+ RLPValue(hex"6970"),
+ RLPValue(hex"339ebf2b"),
+ RLPValue(hex"736563703235366b31"),
+ RLPValue(hex"0215b6ae4e9e18772f297c90d83645b0fbdb56667ce2d747d6d575b21d7b60c2d3"),
+ RLPValue(hex"746370"),
+ RLPValue(hex"a113"),
+ RLPValue(hex"756470"),
+ RLPValue(hex"a113")
+ )
+ )
+
+ val enr = RLPDecoder.decode[ENRResponse](enrResponseWithNonByteValue).enr
+
+ enr.content.attrs should have size 6
+
+ // We have to be able to reserialize the whole signed content,
+ // otherwise we won't be able to verify the signature.
+ val publicKey = PublicKey(enr.content.attrs(EthereumNodeRecord.Keys.secp256k1).toBitVector)
+ EthereumNodeRecord.validateSignature(enr, publicKey).require shouldBe true
+
+ val address = Node.Address.fromEnr(enr).get
+ address.tcpPort should be > 0
+ address.udpPort should be > 0
+ }
}
diff --git a/src/test/scala/io/iohk/ethereum/network/discovery/codecs/RLPCodecsSpec.scala b/src/test/scala/io/iohk/ethereum/network/discovery/codecs/RLPCodecsSpec.scala
index 5d22caf352..38b24a0fa2 100644
--- a/src/test/scala/io/iohk/ethereum/network/discovery/codecs/RLPCodecsSpec.scala
+++ b/src/test/scala/io/iohk/ethereum/network/discovery/codecs/RLPCodecsSpec.scala
@@ -14,6 +14,7 @@ import java.net.InetAddress
import scala.util.Random
import org.scalactic.Equality
import scala.reflect.ClassTag
+import _root_.io.iohk.ethereum.rlp.RLPException
class RLPCodecsSpec extends AnyFlatSpec with Matchers {
import io.iohk.ethereum.rlp.RLPImplicitConversions._
@@ -79,6 +80,33 @@ class RLPCodecsSpec extends AnyFlatSpec with Matchers {
RLPDecoder.decode[Payload.Ping](rlp) shouldBe ping
}
+ it should "reject a Node.Address with more than 3 fields" in {
+ val rlp = RLPList(
+ localhost,
+ 123,
+ 456,
+ 789
+ )
+
+ an[RLPException] should be thrownBy {
+ RLPDecoder.decode[Node.Address](rlp)
+ }
+ }
+
+ it should "reject a Node with more than 4 fields" in {
+ val rlp = RLPList(
+ localhost,
+ 123,
+ 456,
+ randomBytes(64),
+ "only Payloads accept extra fields"
+ )
+
+ an[RLPException] should be thrownBy {
+ RLPDecoder.decode[Node.Address](rlp)
+ }
+ }
+
// The following tests demonstrate what each payload looks like when encoded to RLP,
// because the auto-derivation makes it opaque.
abstract class RLPFixture[T <: Payload: RLPEncoder: RLPDecoder: ClassTag] {
diff --git a/src/universal/bin/mantis-launcher b/src/universal/bin/mantis-launcher
index 5ebb44edc4..8d4ee8e933 100755
--- a/src/universal/bin/mantis-launcher
+++ b/src/universal/bin/mantis-launcher
@@ -1,5 +1,8 @@
#!/bin/bash
+DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
+cd $DIR/..
+
chain="$1"
if [ -z "$chain" ]
then
@@ -7,4 +10,4 @@ then
else
shift
./bin/mantis -Dconfig.file=./conf/"$chain".conf "$@"
-fi
\ No newline at end of file
+fi
diff --git a/src/universal/bin/mantis-vm b/src/universal/bin/mantis-vm
index c9fe7a1219..ced87ae2b4 100755
--- a/src/universal/bin/mantis-vm
+++ b/src/universal/bin/mantis-vm
@@ -1,3 +1,6 @@
#!/bin/bash
+DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
+cd $DIR/..
+
./bin/mantis vm-server "$@"