diff --git a/scalanet/discovery/src/io/iohk/scalanet/discovery/ethereum/EthereumNodeRecord.scala b/scalanet/discovery/src/io/iohk/scalanet/discovery/ethereum/EthereumNodeRecord.scala index 8c1c952d..0f7e7106 100644 --- a/scalanet/discovery/src/io/iohk/scalanet/discovery/ethereum/EthereumNodeRecord.scala +++ b/scalanet/discovery/src/io/iohk/scalanet/discovery/ethereum/EthereumNodeRecord.scala @@ -22,6 +22,7 @@ object EthereumNodeRecord { case class Content( // Nodes should increment this number whenever their properties change, like their address, and re-publish. seq: Long, + // Normally clients treat the values as RLP, however we don't have access to the RLP types here, hence it's just bytes. attrs: SortedMap[ByteVector, ByteVector] ) @@ -52,6 +53,26 @@ object EthereumNodeRecord { /** IPv6-specific UDP port, big endian integer */ val udp6 = key("udp6") + + /** The keys above have pre-defined meaning, but there can be arbitrary entries in the map. */ + val Predefined: Set[ByteVector] = Set(id, secp256k1, ip, tcp, udp, ip6, tcp6, udp6) + } + + def apply(signature: Signature, seq: Long, attrs: (ByteVector, ByteVector)*): EthereumNodeRecord = + EthereumNodeRecord( + signature, + EthereumNodeRecord.Content(seq, SortedMap(attrs: _*)) + ) + + def apply(privateKey: PrivateKey, seq: Long, attrs: (ByteVector, ByteVector)*)( + implicit sigalg: SigAlg, + codec: Codec[Content] + ): Attempt[EthereumNodeRecord] = { + val content = EthereumNodeRecord.Content(seq, SortedMap(attrs: _*)) + codec.encode(content).map { data => + val sig = sigalg.removeRecoveryId(sigalg.sign(privateKey, data)) + EthereumNodeRecord(sig, content) + } } def fromNode(node: Node, privateKey: PrivateKey, seq: Long)( @@ -64,20 +85,15 @@ object EthereumNodeRecord { else (Keys.ip, Keys.tcp, Keys.udp) - val content = Content( - seq, - SortedMap( - Keys.id -> ByteVector("v4".getBytes(StandardCharsets.UTF_8)), - Keys.secp256k1 -> sigalg.compressPublicKey(sigalg.toPublicKey(privateKey)).toByteVector, - ipKey -> ByteVector(node.address.ip.getAddress), - tcpKey -> ByteVector.fromInt(node.address.tcpPort), - udpKey -> ByteVector.fromInt(node.address.udpPort) - ) + val attrs = List( + Keys.id -> ByteVector("v4".getBytes(StandardCharsets.UTF_8)), + Keys.secp256k1 -> sigalg.compressPublicKey(sigalg.toPublicKey(privateKey)).toByteVector, + ipKey -> ByteVector(node.address.ip.getAddress), + tcpKey -> ByteVector.fromInt(node.address.tcpPort), + udpKey -> ByteVector.fromInt(node.address.udpPort) ) - codec.encode(content).map { data => - val sig = sigalg.removeRecoveryId(sigalg.sign(privateKey, data)) - EthereumNodeRecord(sig, content) - } + + apply(privateKey, seq, attrs: _*) } def validateSignature( diff --git a/scalanet/discovery/src/io/iohk/scalanet/discovery/ethereum/Node.scala b/scalanet/discovery/src/io/iohk/scalanet/discovery/ethereum/Node.scala index fde7fdbc..be60808c 100644 --- a/scalanet/discovery/src/io/iohk/scalanet/discovery/ethereum/Node.scala +++ b/scalanet/discovery/src/io/iohk/scalanet/discovery/ethereum/Node.scala @@ -42,7 +42,7 @@ object Node { tryParse[InetAddress](key)(bytes => InetAddress.getByAddress(bytes.toArray)) def tryParsePort(key: ByteVector): Option[Int] = - tryParse[Int](key)(bytes => bytes.toInt()) + tryParse[Int](key)(bytes => bytes.toInt(signed = false)) for { ip <- tryParseIP(Keys.ip6) orElse tryParseIP(Keys.ip) diff --git a/scalanet/discovery/src/io/iohk/scalanet/discovery/ethereum/v4/DiscoveryNetwork.scala b/scalanet/discovery/src/io/iohk/scalanet/discovery/ethereum/v4/DiscoveryNetwork.scala index b2a8bed1..9f411414 100644 --- a/scalanet/discovery/src/io/iohk/scalanet/discovery/ethereum/v4/DiscoveryNetwork.scala +++ b/scalanet/discovery/src/io/iohk/scalanet/discovery/ethereum/v4/DiscoveryNetwork.scala @@ -21,6 +21,7 @@ import scala.util.control.{NonFatal, NoStackTrace} import scodec.bits.BitVector import io.iohk.scalanet.discovery.ethereum.v4.Payload.Neighbors import java.net.InetSocketAddress +import io.iohk.scalanet.discovery.hash.Keccak256 /** Present a stateless facade implementing the RPC methods * that correspond to the discovery protocol messages on top @@ -68,9 +69,9 @@ object DiscoveryNetwork { import DiscoveryRPC.ENRSeq import Payload._ - private val expirationMillis = config.messageExpiration.toMillis - private val maxClockDriftMillis = config.maxClockDrift.toMillis - private val currentTimeMillis = clock.realTime(MILLISECONDS) + private val expirationSeconds = config.messageExpiration.toSeconds + private val maxClockDriftSeconds = config.maxClockDrift.toSeconds + private val currentTimeSeconds = clock.realTime(SECONDS) private val maxNeighborsPerPacket = getMaxNeighborsPerPacket @@ -117,7 +118,7 @@ object DiscoveryNetwork { .toIterant .mapEval { case MessageReceived(packet) => - currentTimeMillis.flatMap { timestamp => + currentTimeSeconds.flatMap { timestamp => Packet.unpack(packet) match { case Attempt.Successful((payload, remotePublicKey)) => payload match { @@ -126,7 +127,7 @@ object DiscoveryNetwork { Task.unit case p: Payload.HasExpiration[_] if isExpired(p, timestamp) => - Task(logger.debug(s"Ignoring expired request from ${channel.to}")) + Task(logger.debug(s"Ignoring expired request from ${channel.to}; ${p.expiration} < $timestamp")) case p: Payload.Request => handleRequest(handler, channel, remotePublicKey, packet.hash, p) @@ -214,7 +215,7 @@ object DiscoveryNetwork { private def setExpiration(payload: Payload): Task[Payload] = { payload match { case p: Payload.HasExpiration[_] => - currentTimeMillis.map(t => p.withExpiration(t + expirationMillis)) + currentTimeSeconds.map(t => p.withExpiration(t + expirationSeconds)) case p => Task.pure(p) } @@ -230,7 +231,7 @@ object DiscoveryNetwork { * our expiration time to 1 hour wouldn't help in this case. */ private def isExpired(payload: HasExpiration[_], now: Long): Boolean = - payload.expiration < now - maxClockDriftMillis + payload.expiration < now - maxClockDriftSeconds /** Ping a peer. */ override val ping = (peer: Peer[A]) => @@ -241,8 +242,14 @@ object DiscoveryNetwork { Ping(version = 4, from = localNodeAddress, to = toNodeAddress(peer.address), 0, localEnrSeq) ) .flatMap { packet => + // Workaround for 1.10 Parity nodes that send back the hash of the Ping data + // rather than the hash of the whole packet (over signature + data). + // https://github.com/paritytech/parity/issues/8038 + // https://github.com/ethereumproject/go-ethereum/issues/312 + val dataHash = Keccak256(packet.data) + channel.collectFirstResponse(peer.id) { - case Pong(_, pingHash, _, maybeRemoteEnrSeq) if pingHash == packet.hash => + case Pong(_, pingHash, _, maybeRemoteEnrSeq) if pingHash == packet.hash || pingHash == dataHash => maybeRemoteEnrSeq } } @@ -317,7 +324,7 @@ object DiscoveryNetwork { case MessageReceived(packet) => packet } .mapEval { packet => - currentTimeMillis.flatMap { timestamp => + currentTimeSeconds.flatMap { timestamp => Packet.unpack(packet) match { case Attempt.Successful((payload, remotePublicKey)) => payload match { @@ -329,7 +336,9 @@ object DiscoveryNetwork { Task.pure(None) case p: Payload.HasExpiration[_] if isExpired(p, timestamp) => - Task(logger.debug(s"Ignoring expired response from ${channel.to}")).as(None) + Task( + logger.debug(s"Ignoring expired response from ${channel.to}; ${p.expiration} < $timestamp") + ).as(None) case p: Payload.Response => Task.pure(Some(p)) diff --git a/scalanet/discovery/src/io/iohk/scalanet/discovery/ethereum/v4/DiscoveryService.scala b/scalanet/discovery/src/io/iohk/scalanet/discovery/ethereum/v4/DiscoveryService.scala index 56ddfcf9..f45a693f 100644 --- a/scalanet/discovery/src/io/iohk/scalanet/discovery/ethereum/v4/DiscoveryService.scala +++ b/scalanet/discovery/src/io/iohk/scalanet/discovery/ethereum/v4/DiscoveryService.scala @@ -815,7 +815,7 @@ object DiscoveryService { val tryEnroll = for { nodeId <- stateRef.get.map(_.node.id) bootstrapPeers = config.knownPeers.toList.map(toPeer).filterNot(_.id == nodeId) - maybeBootstrapEnrs <- bootstrapPeers.traverse(fetchEnr(_, delay = true)) + maybeBootstrapEnrs <- Task.parTraverseN(config.kademliaAlpha)(bootstrapPeers)(fetchEnr(_, delay = true)) result <- if (maybeBootstrapEnrs.exists(_.isDefined)) { lookup(nodeId).as(true) } else { diff --git a/scalanet/discovery/src/io/iohk/scalanet/discovery/ethereum/v4/Payload.scala b/scalanet/discovery/src/io/iohk/scalanet/discovery/ethereum/v4/Payload.scala index 42bf98a8..0f5c6ea5 100644 --- a/scalanet/discovery/src/io/iohk/scalanet/discovery/ethereum/v4/Payload.scala +++ b/scalanet/discovery/src/io/iohk/scalanet/discovery/ethereum/v4/Payload.scala @@ -18,7 +18,7 @@ object Payload { sealed trait Response extends Payload trait HasExpiration[T <: Payload] { - // Absolute UNIX timestamp. + // Absolute UNIX timestamp: seconds since epoch. def expiration: Long def withExpiration(at: Long): T } @@ -28,7 +28,6 @@ object Payload { version: Int, from: Node.Address, to: Node.Address, - // Absolute UNIX timestamp. expiration: Long, // Current ENR sequence number of the sender. enrSeq: Option[Long] diff --git a/scalanet/discovery/test/src/io/iohk/scalanet/discovery/ethereum/v4/DiscoveryNetworkSpec.scala b/scalanet/discovery/test/src/io/iohk/scalanet/discovery/ethereum/v4/DiscoveryNetworkSpec.scala index ba835bf0..b079a1a3 100644 --- a/scalanet/discovery/test/src/io/iohk/scalanet/discovery/ethereum/v4/DiscoveryNetworkSpec.scala +++ b/scalanet/discovery/test/src/io/iohk/scalanet/discovery/ethereum/v4/DiscoveryNetworkSpec.scala @@ -644,7 +644,7 @@ class DiscoveryNetworkSpec extends AsyncFlatSpec with Matchers { _ <- network.startHandling(handleWithSome) channel <- peerGroup.createServerChannel(from = remoteAddress) (request: Payload) = requestMap(rpc) match { - case p: Payload.HasExpiration[_] => p.withExpiration(System.currentTimeMillis - 5000) + case p: Payload.HasExpiration[_] => p.withExpiration(currentTimeSeconds - 5) case p => p } _ <- channel.sendPayloadToSUT(request, remotePrivateKey) @@ -746,7 +746,7 @@ class DiscoveryNetworkSpec extends AsyncFlatSpec with Matchers { } def packetSizeOfNNeighbors(n: Int) = { - val neighbours = Neighbors(List.fill(n)(randomIPv6Node), System.currentTimeMillis) + val neighbours = Neighbors(List.fill(n)(randomIPv6Node), expiration = currentTimeSeconds) val (_, privateKey) = sigalg.newKeyPair val packet = Packet.pack(neighbours, privateKey).require val packetSize = packet.hash.size + packet.signature.size + packet.data.size @@ -789,6 +789,8 @@ object DiscoveryNetworkSpec extends Matchers { maxClockDrift = Duration.Zero ) + def currentTimeSeconds = System.currentTimeMillis / 1000 + trait Fixture { // Implement `test` to assert something. def test: Task[Assertion] @@ -843,14 +845,14 @@ object DiscoveryNetworkSpec extends Matchers { ).runSyncUnsafe() def assertExpirationSet(expiration: Long) = - expiration shouldBe (System.currentTimeMillis + config.messageExpiration.toMillis) +- 3000 + expiration shouldBe (currentTimeSeconds + config.messageExpiration.toSeconds) +- 3 def validExpiration = - System.currentTimeMillis + config.messageExpiration.toMillis + currentTimeSeconds + config.messageExpiration.toSeconds // Anything in the past is invalid. def invalidExpiration = - System.currentTimeMillis - 1 + currentTimeSeconds - 1 implicit class ChannelOps(channel: MockChannel[InetSocketAddress, Packet]) { def sendPayloadToSUT(