diff --git a/beacon_chain/beacon_node.nim b/beacon_chain/beacon_node.nim index 3f61c53f2b..13163b511f 100644 --- a/beacon_chain/beacon_node.nim +++ b/beacon_chain/beacon_node.nim @@ -4,10 +4,10 @@ import chronos, chronicles, confutils, serialization/errors, eth/trie/db, eth/trie/backends/rocksdb_backend, eth/async_utils, spec/[bitfield, datatypes, digest, crypto, beaconstate, helpers, validator], - conf, time, - state_transition, fork_choice, ssz, beacon_chain_db, validator_pool, extras, - attestation_pool, block_pool, eth2_network, beacon_node_types, - mainchain_monitor, trusted_state_snapshots, version + conf, time, state_transition, fork_choice, ssz, beacon_chain_db, + validator_pool, extras, attestation_pool, block_pool, eth2_network, + beacon_node_types, mainchain_monitor, trusted_state_snapshots, version, + sync_protocol, request_manager const topicBeaconBlocks = "ethereum/2.1/beacon_chain/blocks" @@ -18,13 +18,7 @@ const genesisFile = "genesis.json" testnetsBaseUrl = "https://serenity-testnets.status.im" -# ################################################# -# Careful handling of beacon_node <-> sync_protocol -# to avoid recursive dependencies proc onBeaconBlock*(node: BeaconNode, blck: BeaconBlock) {.gcsafe.} - # Forward decl for sync_protocol -import sync_protocol, request_manager -# ################################################# func localValidatorsDir(conf: BeaconNodeConf): string = conf.dataDir / "validators" @@ -86,13 +80,12 @@ proc saveValidatorKey(keyName, key: string, conf: BeaconNodeConf) = writeFile(outputFile, key) info "Imported validator key", file = outputFile -proc persistentNodeId*(conf: BeaconNodeConf): string = - ($ensureNetworkKeys(conf).pubKey)[0..5] - proc init*(T: type BeaconNode, conf: BeaconNodeConf): Future[BeaconNode] {.async.} = new result + result.onBeaconBlock = onBeaconBlock result.config = conf - result.nickname = if conf.nodename == "auto": persistentNodeId(conf) + result.networkIdentity = getPersistentNetIdentity(conf) + result.nickname = if conf.nodename == "auto": shortForm(result.networkIdentity) else: conf.nodename template fail(args: varargs[untyped]) = @@ -182,6 +175,7 @@ proc init*(T: type BeaconNode, conf: BeaconNodeConf): Future[BeaconNode] {.async # TODO sync is called when a remote peer is connected - is that the right # time to do so? let sync = result.network.protocolState(BeaconSync) + sync.chainId = 0 # TODO specify chainId sync.networkId = result.networkMetadata.networkId sync.node = result sync.db = result.db @@ -210,11 +204,10 @@ template withState( body proc connectToNetwork(node: BeaconNode) {.async.} = - let localKeys = ensureNetworkKeys(node.config) var bootstrapNodes = newSeq[BootstrapAddr]() for bootNode in node.networkMetadata.bootstrapNodes: - if bootNode.pubkey == localKeys.pubKey: + if bootNode.isSameNode(node.networkIdentity): node.isBootstrapNode = true else: bootstrapNodes.add bootNode @@ -278,7 +271,7 @@ proc updateHead(node: BeaconNode, slot: Slot): BlockRef = # TODO move all of this logic to BlockPool debug "Preparing for fork choice", stateRoot = shortLog(root), - connectedPeers = node.network.connectedPeers, + connectedPeers = node.network.peersCount, stateSlot = humaneSlotNum(state.slot), stateEpoch = humaneEpochNum(state.slot.slotToEpoch) @@ -656,7 +649,7 @@ proc onSecond(node: BeaconNode, moment: Moment) {.async.} = if missingBlocks.len > 0: info "Requesting detected missing blocks", missingBlocks node.requestManager.fetchAncestorBlocks(missingBlocks) do (b: BeaconBlock): - node.onBeaconBlock(b) + onBeaconBlock(node ,b) let nextSecond = max(Moment.now(), moment + chronos.seconds(1)) addTimer(nextSecond) do (p: pointer): @@ -664,7 +657,7 @@ proc onSecond(node: BeaconNode, moment: Moment) {.async.} = proc run*(node: BeaconNode) = waitFor node.network.subscribe(topicBeaconBlocks) do (blck: BeaconBlock): - node.onBeaconBlock(blck) + onBeaconBlock(node, blck) waitFor node.network.subscribe(topicAttestations) do (attestation: Attestation): node.onAttestation(attestation) @@ -729,7 +722,13 @@ when isMainModule: for i in config.firstValidator.int ..< config.totalValidators.int: let depositFile = config.validatorsDir / validatorFileBaseName(i) & ".deposit.json" - deposits.add Json.loadFile(depositFile, Deposit) + try: + deposits.add Json.loadFile(depositFile, Deposit) + except SerializationError as err: + stderr.write "Error while loading a deposit file:\n" + stderr.write err.formatMsg(depositFile), "\n" + stderr.write "Please regenerate the deposit files by running validator_keygen again\n" + quit 1 let initialState = get_genesis_beacon_state( deposits, diff --git a/beacon_chain/beacon_node_types.nim b/beacon_chain/beacon_node_types.nim index b7da13a9f8..a0a00f5348 100644 --- a/beacon_chain/beacon_node_types.nim +++ b/beacon_chain/beacon_node_types.nim @@ -1,21 +1,8 @@ -import # Beacon Node - eth/[p2p, keys], - spec/[bitfield, digest], - beacon_chain_db, conf, mainchain_monitor, eth2_network, - ./time - -import # Attestation Pool +import + sets, deques, tables, + eth/keys, spec/[bitfield, datatypes, crypto, digest], - deques, tables - # block_pool - -import # Block Pool - spec/[datatypes, digest], - beacon_chain_db, - tables - -import # Validator Pool - spec/crypto, tables + beacon_chain_db, conf, mainchain_monitor, eth2_network, time type @@ -26,18 +13,19 @@ type # ############################################# BeaconNode* = ref object nickname*: string - network*: EthereumNode + network*: Eth2Node + networkIdentity*: Eth2NodeIdentity networkMetadata*: NetworkMetadata requestManager*: RequestManager isBootstrapNode*: bool db*: BeaconChainDB config*: BeaconNodeConf - keys*: KeyPair attachedValidators*: ValidatorPool blockPool*: BlockPool attestationPool*: AttestationPool mainchainMonitor*: MainchainMonitor beaconClock*: BeaconClock + onBeaconBlock*: proc (node: BeaconNode, blck: BeaconBlock) {.gcsafe.} stateCache*: StateData ##\ ## State cache object that's used as a scratch pad @@ -258,10 +246,10 @@ type validators*: Table[ValidatorPubKey, AttachedValidator] RequestManager* = object - network*: EthereumNode + network*: Eth2Node NetworkMetadata* = object - networkId*: uint64 + networkId*: uint8 networkGeneration*: uint64 genesisRoot*: Eth2Digest bootstrapNodes*: seq[BootstrapAddr] diff --git a/beacon_chain/conf.nim b/beacon_chain/conf.nim index a0efd1066a..403c53e251 100644 --- a/beacon_chain/conf.nim +++ b/beacon_chain/conf.nim @@ -4,7 +4,7 @@ import spec/[crypto, datatypes], time, version export - defs + defs, enabledLogLevel const DEFAULT_NETWORK* {.strdefine.} = "testnet0" @@ -79,7 +79,7 @@ type of createTestnet: networkId* {. - desc: "An unique numeric identifier for the network".}: uint64 + desc: "An unique numeric identifier for the network".}: uint8 validatorsDir* {. desc: "Directory containing validator descriptors named vXXXXXXX.deposit.json" diff --git a/beacon_chain/eth2_network.nim b/beacon_chain/eth2_network.nim index 03f1a5a4ee..5c9bf03b3e 100644 --- a/beacon_chain/eth2_network.nim +++ b/beacon_chain/eth2_network.nim @@ -1,15 +1,58 @@ import - options, chronos, json_serialization, strutils, - chronicles, + options, tables, + chronos, json_serialization, strutils, chronicles, eth/net/nat, spec/digest, version, conf const clientId = "Nimbus beacon node v" & fullVersionStr() -when useRLPx: +export + version + +let + globalListeningAddr = parseIpAddress("0.0.0.0") + +proc setupNat(conf: BeaconNodeConf): tuple[ip: IpAddress, + tcpPort: Port, + udpPort: Port] = + # defaults + result.ip = globalListeningAddr + result.tcpPort = Port(conf.tcpPort) + result.udpPort = Port(conf.udpPort) + + var nat: NatStrategy + case conf.nat.toLowerAscii: + of "any": + nat = NatAny + of "none": + nat = NatNone + of "upnp": + nat = NatUpnp + of "pmp": + nat = NatPmp + else: + if conf.nat.startsWith("extip:") and isIpAddress(conf.nat[6..^1]): + # any required port redirection is assumed to be done by hand + result.ip = parseIpAddress(conf.nat[6..^1]) + nat = NatNone + else: + error "not a valid NAT mechanism, nor a valid IP address", value = conf.nat + quit(QuitFailure) + + if nat != NatNone: + let extIP = getExternalIP(nat) + if extIP.isSome: + result.ip = extIP.get() + let extPorts = redirectPorts(tcpPort = result.tcpPort, + udpPort = result.udpPort, + description = clientId) + if extPorts.isSome: + (result.tcpPort, result.udpPort) = extPorts.get() + +when networkBackend == rlpxBackend: import os, - eth/[rlp, p2p, keys, net/nat], gossipsub_protocol, + eth/[rlp, p2p, keys], gossipsub_protocol, eth/p2p/peer_pool # for log on connected peers export @@ -17,49 +60,14 @@ when useRLPx: const netBackendName* = "rlpx" + IrrelevantNetwork* = UselessPeer type Eth2Node* = EthereumNode + Eth2NodeIdentity* = KeyPair BootstrapAddr* = ENode - template libp2pProtocol*(name, version: string) {.pragma.} - - proc setupNat(conf: BeaconNodeConf): tuple[ip: IpAddress, tcpPort: Port, udpPort: Port] = - # defaults - result.ip = parseIpAddress("127.0.0.1") - result.tcpPort = Port(conf.tcpPort) - result.udpPort = Port(conf.udpPort) - - var nat: NatStrategy - case conf.nat.toLowerAscii: - of "any": - nat = NatAny - of "none": - nat = NatNone - of "upnp": - nat = NatUpnp - of "pmp": - nat = NatPmp - else: - if conf.nat.startsWith("extip:") and isIpAddress(conf.nat[6..^1]): - # any required port redirection is assumed to be done by hand - result.ip = parseIpAddress(conf.nat[6..^1]) - nat = NatNone - else: - error "not a valid NAT mechanism, nor a valid IP address", value = conf.nat - quit(QuitFailure) - - if nat != NatNone: - let extIP = getExternalIP(nat) - if extIP.isSome: - result.ip = extIP.get() - let extPorts = redirectPorts(tcpPort = result.tcpPort, - udpPort = result.udpPort, - description = clientId) - if extPorts.isSome: - (result.tcpPort, result.udpPort) = extPorts.get() - - proc ensureNetworkKeys*(conf: BeaconNodeConf): KeyPair = + proc getPersistentNetIdentity*(conf: BeaconNodeConf): Eth2NodeIdentity = let privateKeyFile = conf.dataDir / "network.privkey" var privKey: PrivateKey if not fileExists(privateKeyFile): @@ -74,10 +82,16 @@ when useRLPx: proc getPersistenBootstrapAddr*(conf: BeaconNodeConf, ip: IpAddress, port: Port): BootstrapAddr = let - keys = ensureNetworkKeys(conf) + identity = getPersistentNetIdentity(conf) address = Address(ip: ip, tcpPort: port, udpPort: port) - initENode(keys.pubKey, address) + initENode(identity.pubKey, address) + + proc isSameNode*(bootstrapNode: BootstrapAddr, id: Eth2NodeIdentity): bool = + bootstrapNode.pubKey == id.pubKey + + proc shortForm*(id: Eth2NodeIdentity): string = + ($id.pubKey)[0..5] proc writeValue*(writer: var JsonWriter, value: BootstrapAddr) {.inline.} = writer.writeValue $value @@ -87,7 +101,7 @@ when useRLPx: proc createEth2Node*(conf: BeaconNodeConf): Future[EthereumNode] {.async.} = let - keys = ensureNetworkKeys(conf) + keys = getPersistentNetIdentity(conf) (ip, tcpPort, udpPort) = setupNat(conf) address = Address(ip: ip, tcpPort: tcpPort, @@ -104,22 +118,31 @@ when useRLPx: proc init*(T: type BootstrapAddr, str: string): T = initENode(str) - func connectedPeers*(enode: EthereumNode): int = - enode.peerPool.len + func peersCount*(node: Eth2Node): int = + node.peerPool.len else: import - libp2p/daemon/daemonapi, - libp2p_backend + os, random, std_shims/io, + libp2p/crypto/crypto, libp2p/daemon/daemonapi, eth/async_utils, + ssz - export - libp2p_backend + when networkBackend == libp2pSpecBackend: + import libp2p_spec_backend + export libp2p_spec_backend + const netBackendName* = "libp2p_spec" + + else: + import libp2p_backend + export libp2p_backend + const netBackendName* = "libp2p_native" type BootstrapAddr* = PeerInfo + Eth2NodeIdentity* = PeerInfo const - netBackendName* = "libp2p" + networkKeyFilename = "privkey.protobuf" proc writeValue*(writer: var JsonWriter, value: PeerID) {.inline.} = writer.writeValue value.pretty @@ -136,10 +159,52 @@ else: proc init*(T: type BootstrapAddr, str: string): T = Json.decode(str, PeerInfo) + proc ensureNetworkIdFile(conf: BeaconNodeConf): string = + result = conf.dataDir / networkKeyFilename + if not fileExists(result): + createDir conf.dataDir.string + let pk = PrivateKey.random(Ed25519) + writeFile(result, pk.getBytes) + + proc getPersistentNetIdentity*(conf: BeaconNodeConf): Eth2NodeIdentity = + # Using waitFor here is reasonable, because this proc is needed only + # prior to connecting to the network. The RLPx alternative reads from + # file and it's much easier to use if it's not async. + # TODO: revisit in the future when we have our own Lib2P2 implementation. + let daemon = waitFor newDaemonApi(id = conf.ensureNetworkIdFile) + result = waitFor daemon.identity() + waitFor daemon.close() + + template tcpEndPoint(address, port): auto = + MultiAddress.init(address, IPPROTO_TCP, port) + proc createEth2Node*(conf: BeaconNodeConf): Future[Eth2Node] {.async.} = - var node = new Eth2Node - await node.init() - return node + var + (extIp, extTcpPort, extUdpPort) = setupNat(conf) + hostAddress = tcpEndPoint(globalListeningAddr, Port conf.tcpPort) + announcedAddresses = if extIp != globalListeningAddr: @[] + else: @[tcpEndPoint(extIp, extTcpPort)] + keyFile = conf.ensureNetworkIdFile + + info "Starting LibP2P deamon", hostAddress, announcedAddresses, keyFile + let daemon = await newDaemonApi({PSGossipSub}, + id = keyFile, + hostAddresses = @[hostAddress], + announcedAddresses = announcedAddresses) + + return await Eth2Node.init(daemon) + + proc getPersistenBootstrapAddr*(conf: BeaconNodeConf, + ip: IpAddress, port: Port): BootstrapAddr = + result = getPersistentNetIdentity(conf) + result.addresses = @[tcpEndPoint(ip, port)] + + proc isSameNode*(bootstrapNode: BootstrapAddr, id: Eth2NodeIdentity): bool = + bootstrapNode.peer == id.peer + + proc shortForm*(id: Eth2NodeIdentity): string = + # TODO: Make this shorter + $id proc connectToNetwork*(node: Eth2Node, bootstrapNodes: seq[PeerInfo]) {.async.} = # TODO: perhaps we should do these in parallel @@ -147,7 +212,7 @@ else: try: await node.daemon.connect(bootstrapNode.peer, bootstrapNode.addresses) let peer = node.getPeer(bootstrapNode.peer) - await peer.performProtocolHandshakes() + await initializeConnection(peer) except PeerDisconnected: error "Failed to connect to bootstrap node", node = bootstrapNode @@ -158,3 +223,31 @@ else: proc loadConnectionAddressFile*(filename: string): PeerInfo = Json.loadFile(filename, PeerInfo) + func peersCount*(node: Eth2Node): int = + node.peers.len + + proc makeMessageHandler[MsgType](msgHandler: proc(msg: MsgType)): P2PPubSubCallback = + result = proc(api: DaemonAPI, + ticket: PubsubTicket, + msg: PubSubMessage): Future[bool] {.async.} = + msgHandler SSZ.decode(msg.data, MsgType) + return true + + proc subscribe*[MsgType](node: Eth2Node, + topic: string, + msgHandler: proc(msg: MsgType)) {.async.} = + discard await node.daemon.pubsubSubscribe(topic, makeMessageHandler(msgHandler)) + + proc broadcast*(node: Eth2Node, topic: string, msg: auto) = + traceAsyncErrors node.daemon.pubsubPublish(topic, SSZ.encode(msg)) + + # TODO: + # At the moment, this is just a compatiblity shim for the existing RLPx functionality. + # The filtering is not implemented properly yet. + iterator randomPeers*(node: Eth2Node, maxPeers: int, Protocol: type): Peer = + var peers = newSeq[Peer]() + for _, peer in pairs(node.peers): peers.add peer + shuffle peers + if peers.len > maxPeers: peers.setLen(maxPeers) + for p in peers: yield p + diff --git a/beacon_chain/libp2p_backend.nim b/beacon_chain/libp2p_backend.nim index 957d202fbc..5bce12e735 100644 --- a/beacon_chain/libp2p_backend.nim +++ b/beacon_chain/libp2p_backend.nim @@ -1,11 +1,12 @@ import - options, macros, algorithm, + options, macros, algorithm, tables, std_shims/[macros_shim, tables_shims], chronos, chronicles, libp2p/daemon/daemonapi, faststreams/output_stream, serialization, + eth/p2p/p2p_protocol_dsl, ssz export - daemonapi + daemonapi, p2pProtocol type Eth2Node* = ref object of RootObj @@ -13,14 +14,42 @@ type peers*: Table[PeerID, Peer] protocolStates*: seq[RootRef] + EthereumNode = Eth2Node # needed for the definitions in p2p_backends_helpers + Peer* = ref object - network: Eth2Node - id: PeerID - connectionState: ConnectionState + network*: Eth2Node + id*: PeerID + connectionState*: ConnectionState awaitedMessages: Table[CompressedMsgId, FutureBase] protocolStates*: seq[RootRef] + maxInactivityAllowed*: Duration + + ConnectionState* = enum + None, + Connecting, + Connected, + Disconnecting, + Disconnected + + DisconnectionReason* = enum + UselessPeer + BreachOfProtocol + FaultOrError + + UntypedResponder = object + peer*: Peer + stream*: P2PStream - EthereumNode = Eth2Node # This alias is needed for state_helpers below + Responder*[MsgType] = distinct UntypedResponder + + MessageInfo* = object + name*: string + + # Private fields: + thunk*: ThunkProc + libp2pProtocol: string + printer*: MessageContentPrinter + nextMsgResolver*: NextMsgResolver ProtocolInfoObj* = object name*: string @@ -36,198 +65,220 @@ type ProtocolInfo* = ptr ProtocolInfoObj - MessageInfo* = object - name*: string - - # Private fields: - thunk*: MessageHandler - libp2pProtocol: string - printer*: MessageContentPrinter - nextMsgResolver*: NextMsgResolver - CompressedMsgId = tuple - protocolIndex, msgId: int + protocolIdx, methodId: int - MessageKind* = enum - msgNotification, - msgRequest, - msgResponse + ResponseCode* = enum + Success + EncodingError + InvalidRequest + ServerError PeerStateInitializer* = proc(peer: Peer): RootRef {.gcsafe.} NetworkStateInitializer* = proc(network: EthereumNode): RootRef {.gcsafe.} - HandshakeStep* = proc(peer: Peer, handshakeStream: P2PStream): Future[void] {.gcsafe.} + HandshakeStep* = proc(peer: Peer, stream: P2PStream): Future[void] {.gcsafe.} DisconnectionHandler* = proc(peer: Peer): Future[void] {.gcsafe.} - MessageHandler* = proc(daemon: DaemonAPI, stream: P2PStream): Future[void] {.gcsafe.} + ThunkProc* = proc(daemon: DaemonAPI, stream: P2PStream): Future[void] {.gcsafe.} MessageContentPrinter* = proc(msg: pointer): string {.gcsafe.} NextMsgResolver* = proc(msgData: SszReader, future: FutureBase) {.gcsafe.} - ConnectionState* = enum - None, - Connecting, - Connected, - Disconnecting, - Disconnected - - UntypedResponse = object - peer*: Peer - stream*: P2PStream - - Response*[MsgType] = distinct UntypedResponse - Bytes = seq[byte] - DisconnectionReason* = enum - UselessPeer - BreachOfProtocol - PeerDisconnected* = object of CatchableError reason*: DisconnectionReason + TransmissionError* = object of CatchableError + const defaultIncomingReqTimeout = 5000 defaultOutgoingReqTimeout = 10000 + HandshakeTimeout = BreachOfProtocol + + IrrelevantNetwork* = UselessPeer + +include eth/p2p/p2p_backends_helpers +include eth/p2p/p2p_tracing +include libp2p_backends_common -var - gProtocols: seq[ProtocolInfo] - -# The variables above are immutable RTTI information. We need to tell -# Nim to not consider them GcSafe violations: -template allProtocols: auto = {.gcsafe.}: gProtocols - -proc disconnect*(peer: Peer) {.async.} = - if peer.connectionState notin {Disconnecting, Disconnected}: - peer.connectionState = Disconnecting - await peer.network.daemon.disconnect(peer.id) - peer.connectionState = Disconnected - peer.network.peers.del(peer.id) - -template raisePeerDisconnected(msg: string, r: DisconnectionReason) = - var e = newException(PeerDisconnected, msg) - e.reason = r - raise e - -proc disconnectAndRaise(peer: Peer, - reason: DisconnectionReason, - msg: string) {.async.} = - let r = reason - await peer.disconnect() - raisePeerDisconnected(msg, reason) - -proc init*(node: Eth2Node) {.async.} = - node.daemon = await newDaemonApi({PSGossipSub}) - node.daemon.userData = node - init node.peers - - newSeq node.protocolStates, allProtocols.len +proc init*(T: type Eth2Node, daemon: DaemonAPI): Future[T] {.async.} = + new result + result.daemon = daemon + result.daemon.userData = result + result.peers = initTable[PeerID, Peer]() + + newSeq result.protocolStates, allProtocols.len for proto in allProtocols: if proto.networkStateInitializer != nil: - node.protocolStates[proto.index] = proto.networkStateInitializer(node) + result.protocolStates[proto.index] = proto.networkStateInitializer(result) for msg in proto.messages: if msg.libp2pProtocol.len > 0: - await node.daemon.addHandler(@[msg.libp2pProtocol], msg.thunk) - -include eth/p2p/p2p_backends_helpers -include eth/p2p/p2p_tracing - -import typetraits + await daemon.addHandler(@[msg.libp2pProtocol], msg.thunk) + +proc readMsg(stream: P2PStream, + MsgType: type, + withResponseCode: bool, + deadline: Future[void]): Future[Option[MsgType]] {.gcsafe.} + +proc readMsgBytes(stream: P2PStream, + withResponseCode: bool, + deadline: Future[void]): Future[Bytes] {.async.} = + if withResponseCode: + var responseCode: byte + var readResponseCode = stream.transp.readExactly(addr responseCode, 1) + await readResponseCode or deadline + if not readResponseCode.finished: return + if responseCode > ResponseCode.high.byte: return + + logScope: responseCode = ResponseCode(responseCode) + case ResponseCode(responseCode) + of InvalidRequest: + debug "P2P request was classified as invalid" + return + of EncodingError, ServerError: + let responseErrMsg = await readMsg(stream, string, false, deadline) + debug "P2P request resulted in error", responseErrMsg + return + of Success: + # The response is OK, the execution continues below + discard -proc readMsg(stream: P2PStream, MsgType: type, - timeout = 10000): Future[Option[MsgType]] {.async.} = - var timeout = sleepAsync timeout var sizePrefix: uint32 var readSizePrefix = stream.transp.readExactly(addr sizePrefix, sizeof(sizePrefix)) - await readSizePrefix or timeout + await readSizePrefix or deadline if not readSizePrefix.finished: return - debug "EXPECTING MSG", msg = MsgType.name, size = sizePrefix.int + if sizePrefix == 0: + debug "Received SSZ with zero size", peer = stream.peer + return var msgBytes = newSeq[byte](sizePrefix.int + sizeof(sizePrefix)) copyMem(addr msgBytes[0], addr sizePrefix, sizeof(sizePrefix)) var readBody = stream.transp.readExactly(addr msgBytes[sizeof(sizePrefix)], sizePrefix.int) - await readBody or timeout + await readBody or deadline if not readBody.finished: return - let decoded = SSZ.decode(msgBytes, MsgType) + return msgBytes + +proc readMsgBytesOrClose(stream: P2PStream, + withResponseCode: bool, + deadline: Future[void]): Future[Bytes] {.async.} = + result = await stream.readMsgBytes(withResponseCode, deadline) + if result.len == 0: await stream.close() + +proc readMsg(stream: P2PStream, + MsgType: type, + withResponseCode: bool, + deadline: Future[void]): Future[Option[MsgType]] {.gcsafe, async.} = + var msgBytes = await stream.readMsgBytesOrClose(withResponseCode, deadline) try: - return some(decoded) - except SerializationError: + if msgBytes.len > 0: return some SSZ.decode(msgBytes, MsgType) + except SerializationError as err: + debug "Failed to decode a network message", + msgBytes, errMsg = err.formatMsg("") return +proc sendErrorResponse(peer: Peer, + stream: P2PStream, + err: ref SerializationError, + msgName: string, + msgBytes: Bytes) {.async.} = + debug "Received an invalid request", + peer, msgName, msgBytes, errMsg = err.formatMsg("") + + var responseCode = byte(EncodingError) + discard await stream.transp.write(addr responseCode, 1) + await stream.close() + +proc sendErrorResponse(peer: Peer, + stream: P2PStream, + responseCode: ResponseCode, + errMsg: string) {.async.} = + debug "Error processing request", + peer, responseCode, errMsg + + var outputStream = init OutputStream + outputStream.append byte(responseCode) + outputStream.appendValue SSZ, errMsg + + discard await stream.transp.write(outputStream.getOutput) + await stream.close() + proc sendMsg(peer: Peer, protocolId: string, requestBytes: Bytes) {.async} = var stream = await peer.network.daemon.openStream(peer.id, @[protocolId]) # TODO how does openStream fail? Set a timeout here and handle it let sent = await stream.transp.write(requestBytes) - # TODO: Should I check that `sent` is equal to the desired number of bytes + if sent != requestBytes.len: + raise newException(TransmissionError, "Failed to deliver all bytes") proc sendBytes(stream: P2PStream, bytes: Bytes) {.async.} = let sent = await stream.transp.write(bytes) - # TODO: Should I check that `sent` is equal to the desired number of bytes + if sent != bytes.len: + raise newException(TransmissionError, "Failed to deliver all bytes") proc makeEth2Request(peer: Peer, protocolId: string, requestBytes: Bytes, ResponseMsg: type, - timeout = 10000): Future[Option[ResponseMsg]] {.async.} = - var stream = await peer.network.daemon.openStream(peer.id, @[protocolId]) - # TODO how does openStream fail? Set a timeout here and handle it + timeout: Duration): Future[Option[ResponseMsg]] {.gcsafe, async.} = + var deadline = sleepAsync timeout + # Open a new LibP2P stream + var streamFut = peer.network.daemon.openStream(peer.id, @[protocolId]) + await streamFut or deadline + if not streamFut.finished: + return none(ResponseMsg) + + # Send the request + let stream = streamFut.read let sent = await stream.transp.write(requestBytes) - # TODO: Should I check that `sent` is equal to the desired number of bytes - return await stream.readMsg(ResponseMsg, timeout) - -proc handshakeImpl(peer: Peer, - handshakeSendFut: Future[void], - handshakeStream: P2PStream, - timeout: int, - HandshakeType: type): Future[HandshakeType] {.async.} = - await handshakeSendFut - let response = await handshakeStream.readMsg(HandshakeType, timeout) - if response.isSome: - return response.get - else: - await peer.disconnectAndRaise(BreachOfProtocol, "Handshake not completed in time") + if sent != requestBytes.len: + await disconnectAndRaise(peer, FaultOrError, "Incomplete send") -proc p2pStreamName(MsgType: type): string = - mixin msgProtocol, protocolInfo, msgId - MsgType.msgProtocol.protocolInfo.messages[MsgType.msgId].libp2pProtocol + # Read the response + return await stream.readMsg(ResponseMsg, true, deadline) -macro handshake*(peer: Peer, timeout = 10000, sendCall: untyped): untyped = - let - msgName = $sendCall[0] - msgType = newDotExpr(ident"CurrentProtocol", ident(msgName)) - handshakeStream = ident "handshakeStream" - handshakeImpl = bindSym "handshakeImpl" - await = ident "await" +proc exchangeHandshake(peer: Peer, protocolId: string, requestBytes: Bytes, + ResponseMsg: type, + timeout: Duration): Future[ResponseMsg] {.gcsafe, async.} = + var response = await makeEth2Request(peer, protocolId, requestBytes, + ResponseMsg, timeout) + if not response.isSome: + await peer.disconnectAndRaise(BreachOfProtocol, "Failed to complete a handshake") - sendCall.insert(1, handshakeStream) + return response.get - result = quote do: - proc payload(peer: Peer, `handshakeStream`: P2PStream): Future[`msgType`] {.async.} = - var `handshakeStream` = `handshakeStream` - if `handshakeStream` == nil: - `handshakeStream` = `await` openStream(peer.network.daemon, - peer.id, - @[p2pStreamName(`msgType`)], - `timeout`) - return `await` `handshakeImpl`(peer, `sendCall`, `handshakeStream`, `timeout`, `msgType`) +proc p2pStreamName(MsgType: type): string = + mixin msgProtocol, protocolInfo, msgId + MsgType.msgProtocol.protocolInfo.messages[MsgType.msgId].libp2pProtocol - payload(`peer`, `handshakeStream`) +template handshakeImpl(outputStreamVar, handshakeSerializationCall: untyped, + lowLevelThunk: untyped, + HandshakeType: untyped, + # TODO: we cannot use a type parameter above + # because of the following Nim issue: + # + peer: Peer, + stream: P2PStream, + timeout: Duration): auto = + if stream == nil: + var outputStreamVar = init OutputStream + handshakeSerializationCall + exchangeHandshake(peer, p2pStreamName(HandshakeType), + getOutput(outputStreamVar), HandshakeType, timeout) + else: + proc asyncStep: Future[HandshakeType] {.async.} = + let deadline = sleepAsync timeout + var responseFut = nextMsg(peer, HandshakeType) + await lowLevelThunk(peer.network.daemon, stream) or deadline + if not responseFut.finished: + await disconnectAndRaise(peer, BreachOfProtocol, "Failed to complete a handshake") -proc getCompressedMsgId(MsgType: type): CompressedMsgId = - mixin msgProtocol, protocolInfo, msgId - (protocolIndex: MsgType.msgProtocol.protocolInfo.index, msgId: MsgType.msgId) + var outputStreamVar = init OutputStream + append(outputStreamVar, byte(Success)) + handshakeSerializationCall + await sendBytes(stream, getOutput(outputStreamVar)) -proc nextMsg*(peer: Peer, MsgType: type): Future[MsgType] = - ## This procs awaits a specific P2P message. - ## Any messages received while waiting will be dispatched to their - ## respective handlers. The designated message handler will also run - ## to completion before the future returned by `nextMsg` is resolved. - mixin msgProtocol, protocolInfo, msgId - let awaitedMsgId = getCompressedMsgId(MsgType) - let f = getOrDefault(peer.awaitedMessages, awaitedMsgId) - if not f.isNil: - return Future[MsgType](f) + return responseFut.read - newFuture result - peer.awaitedMessages[awaitedMsgId] = result + asyncStep() proc resolveNextMsgFutures(peer: Peer, msg: auto) = type MsgType = type(msg) @@ -242,6 +293,7 @@ proc init*(T: type Peer, network: Eth2Node, id: PeerID): Peer = result.network = network result.awaitedMessages = initTable[CompressedMsgId, FutureBase]() result.connectionState = Connected + result.maxInactivityAllowed = 15.minutes # TODO: Read this from the config newSeq result.protocolStates, allProtocols.len for i in 0 ..< allProtocols.len: let proto = allProtocols[i] @@ -256,30 +308,8 @@ proc performProtocolHandshakes*(peer: Peer) {.async.} = await all(subProtocolsHandshakes) -proc getPeer*(node: Eth2Node, peerId: PeerID): Peer = - result = node.peers.getOrDefault(peerId) - if result == nil: - result = Peer.init(node, peerId) - node.peers[peerId] = result - -proc peerFromStream(daemon: DaemonAPI, stream: P2PStream): Peer = - Eth2Node(daemon.userData).getPeer(stream.peer) - -template getRecipient(peer: Peer): Peer = - peer - -# TODO: this should be removed eventually -template getRecipient(stream: P2PStream): P2PStream = - stream - -template getRecipient(response: Response): Peer = - UntypedResponse(response).peer - -proc messagePrinter[MsgType](msg: pointer): string {.gcsafe.} = - result = "" - # TODO: uncommenting the line below increases the compile-time - # tremendously (for reasons not yet known) - # result = $(cast[ptr MsgType](msg)[]) +template initializeConnection*(peer: Peer): auto = + performProtocolHandshakes(peer) proc initProtocol(name: string, peerInit: PeerStateInitializer, @@ -289,15 +319,9 @@ proc initProtocol(name: string, result.peerStateInitializer = peerInit result.networkStateInitializer = networkInit -proc setEventHandlers(p: ProtocolInfo, - handshake: HandshakeStep, - disconnectHandler: DisconnectionHandler) = - p.handshake = handshake - p.disconnectHandler = disconnectHandler - proc registerMsg(protocol: ProtocolInfo, name: string, - thunk: MessageHandler, + thunk: ThunkProc, libp2pProtocol: string, printer: MessageContentPrinter) = protocol.messages.add MessageInfo(name: name, @@ -305,465 +329,234 @@ proc registerMsg(protocol: ProtocolInfo, libp2pProtocol: libp2pProtocol, printer: printer) -proc registerProtocol(protocol: ProtocolInfo) = - # TODO: This can be done at compile-time in the future - let pos = lowerBound(gProtocols, protocol) - gProtocols.insert(protocol, pos) - for i in 0 ..< gProtocols.len: - gProtocols[i].index = i +proc getRequestProtoName(fn: NimNode): NimNode = + return newLit("/ETH/BeaconChain/" & $fn.name & "/1/SSZ") -template libp2pProtocol*(name, version: string) {.pragma.} +proc init*[MsgType](T: type Responder[MsgType], + peer: Peer, stream: P2PStream): T = + T(UntypedResponder(peer: peer, stream: stream)) -proc getRequestProtoName(fn: NimNode): NimNode = - # `getCustomPragmaVal` doesn't work yet on regular nnkProcDef nodes - # (TODO: file as an issue) - - let pragmas = fn.pragma - if pragmas.kind == nnkPragma and pragmas.len > 0: - for pragma in pragmas: - if pragma.len > 0 and $pragma[0] == "libp2pProtocol": - return pragma[1] - - error "All stream opening procs must have the 'libp2pProtocol' pragma specified.", fn - -macro p2pProtocolImpl(name: static[string], - version: static[uint], - body: untyped, - timeout: static[int] = defaultOutgoingReqTimeout, - shortName: static[string] = "", - peerState = type(nil), - networkState = type(nil)): untyped = - ## The macro used to defined P2P sub-protocols. See README. +proc implementSendProcBody(sendProc: SendProc) = + let + msg = sendProc.msg + UntypedResponder = bindSym "UntypedResponder" + await = ident "await" + + proc sendCallGenerator(peer, bytes: NimNode): NimNode = + if msg.kind != msgResponse: + let msgProto = getRequestProtoName(msg.procDef) + case msg.kind + of msgRequest: + let + timeout = msg.timeoutParam[0] + ResponseRecord = msg.response.recIdent + quote: + makeEth2Request(`peer`, `msgProto`, `bytes`, + `ResponseRecord`, `timeout`) + of msgHandshake: + let + timeout = msg.timeoutParam[0] + HandshakeRecord = msg.recIdent + quote: + exchangeHandshake(`peer`, `msgProto`, `bytes`, + `HandshakeRecord`, `timeout`) + else: + quote: sendMsg(`peer`, `msgProto`, `bytes`) + else: + quote: sendBytes(`UntypedResponder`(`peer`).stream, `bytes`) + + proc prependResponseCode(stream: NimNode): NimNode = + quote: append(`stream`, byte(Success)) + + let preSerializationStep = if msg.kind == msgResponse: + prependResponseCode + else: + nil + + sendProc.useStandardBody(preSerializationStep, nil, sendCallGenerator) + +proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend = var - # XXX: deal with a Nim bug causing the macro params to be - # zero when they are captured by a closure: - defaultTimeout = timeout - protoName = name - nextId = -1 - protoNameIdent = ident(protoName) - outTypes = newNimNode(nnkStmtList) - outSendProcs = newNimNode(nnkStmtList) - outRecvProcs = newNimNode(nnkStmtList) - outProcRegistrations = newNimNode(nnkStmtList) - response = ident"response" - name_openStream = newTree(nnkPostfix, ident("*"), ident"openStream") - outputStream = ident"outputStream" - currentProtocolSym = ident"CurrentProtocol" - protocol = ident(protoName & "Protocol") - peerState = verifyStateType peerState.getType - networkState = verifyStateType networkState.getType - handshake = newNilLit() - disconnectHandler = newNilLit() - Format = ident"SSZ" - Option = bindSym "Option" - UntypedResponse = bindSym "UntypedResponse" - Response = bindSym "Response" + Format = ident "SSZ" + Responder = bindSym "Responder" DaemonAPI = bindSym "DaemonAPI" P2PStream = ident "P2PStream" - # XXX: Binding the int type causes instantiation failure for some reason - # Int = bindSym "int" - Int = ident "int" - Void = ident "void" + OutputStream = bindSym "OutputStream" Peer = bindSym "Peer" - writeField = bindSym "writeField" - createNetworkState = bindSym "createNetworkState" - createPeerState = bindSym "createPeerState" - getOutput = bindSym "getOutput" + Eth2Node = bindSym "Eth2Node" messagePrinter = bindSym "messagePrinter" + milliseconds = bindSym "milliseconds" + registerMsg = bindSym "registerMsg" initProtocol = bindSym "initProtocol" - getRecipient = bindSym "getRecipient" - peerFromStream = bindSym "peerFromStream" - makeEth2Request = bindSym "makeEth2Request" - sendMsg = bindSym "sendMsg" - sendBytes = bindSym "sendBytes" - getState = bindSym "getState" - getNetworkState = bindSym "getNetworkState" - resolveNextMsgFutures = bindSym "resolveNextMsgFutures" - - proc augmentUserHandler(userHandlerProc: NimNode, - msgKind = msgNotification, - extraDefinitions: NimNode = nil) = - ## Turns a regular proc definition into an async proc and adds - ## the helpers for accessing the peer and network protocol states. - - userHandlerProc.addPragma ident"gcsafe" - userHandlerProc.addPragma ident"async" - - # We allow the user handler to use `openarray` params, but we turn - # those into sequences to make the `async` pragma happy. - for i in 1 ..< userHandlerProc.params.len: - var param = userHandlerProc.params[i] - param[^2] = chooseFieldType(param[^2]) - - var userHandlerDefinitions = newStmtList() - - userHandlerDefinitions.add quote do: - type `currentProtocolSym` = `protoNameIdent` - - if extraDefinitions != nil: - userHandlerDefinitions.add extraDefinitions - - # Define local accessors for the peer and the network protocol states - # inside each user message handler proc (e.g. peer.state.foo = bar) - if peerState != nil: - userHandlerDefinitions.add quote do: - template state(p: `Peer`): `peerState` = - cast[`peerState`](`getState`(p, `protocol`)) - - if networkState != nil: - userHandlerDefinitions.add quote do: - template networkState(p: `Peer`): `networkState` = - cast[`networkState`](`getNetworkState`(p.network, `protocol`)) - - userHandlerProc.body.insert 0, userHandlerDefinitions - - proc liftEventHandler(doBlock: NimNode, handlerName: string): NimNode = - ## Turns a "named" do block to a regular async proc - ## (e.g. onPeerConnected do ...) - result = newTree(nnkProcDef) - doBlock.copyChildrenTo(result) - result.name = genSym(nskProc, protoName & handlerName) - augmentUserHandler result - outRecvProcs.add result - - proc addMsgHandler(n: NimNode, msgKind = msgNotification, - responseRecord: NimNode = nil): NimNode = - if n[0].kind == nnkPostfix: - macros.error("p2pProcotol procs are public by default. " & - "Please remove the postfix `*`.", n) - - inc nextId + bindSymOp = bindSym "bindSym" + errVar = ident "err" + msgVar = ident "msg" + msgBytesVar = ident "msgBytes" + daemonVar = ident "daemon" + await = ident "await" + + p.useRequestIds = false + new result + + result.PeerType = Peer + result.NetworkType = Eth2Node + result.registerProtocol = bindSym "registerProtocol" + result.setEventHandlers = bindSym "setEventHandlers" + result.SerializationFormat = Format + result.ResponderType = Responder + + result.afterProtocolInit = proc (p: P2PProtocol) = + p.onPeerConnected.params.add newIdentDefs(streamVar, P2PStream) + + result.implementMsg = proc (msg: Message) = + let + protocol = msg.protocol + msgName = $msg.ident + msgNameLit = newLit msgName + msgRecName = msg.recIdent + + if msg.procDef.body.kind != nnkEmpty and msg.kind == msgRequest: + # Request procs need an extra param - the stream where the response + # should be written: + msg.userHandler.params.insert(2, newIdentDefs(streamVar, P2PStream)) + msg.initResponderCall.add streamVar + + ## + ## Implemenmt Thunk + ## + var thunkName = ident(msgName & "_thunk") let - msgIdent = n.name - msgName = $n.name - - var - userPragmas = n.pragma - - # variables used in the sending procs - msgRecipient = ident"msgRecipient" - sendTo = ident"sendTo" - writer = ident"writer" - recordStartMemo = ident"recordStartMemo" - reqTimeout: NimNode - appendParams = newNimNode(nnkStmtList) - paramsToWrite = newSeq[NimNode](0) - msgId = newLit(nextId) - - # variables used in the receiving procs - receivedMsg = ident"msg" - daemon = ident "daemon" - stream = ident "stream" - await = ident "await" - peerIdent = ident "peer" - tracing = newNimNode(nnkStmtList) - - # nodes to store the user-supplied message handling proc if present - userHandlerProc: NimNode = nil - userHandlerCall: NimNode = nil - awaitUserHandler = newStmtList() - - # a record type associated with the message - msgRecord = newIdentNode(msgName & "Obj") - msgRecordFields = newTree(nnkRecList) - msgRecordBody = newTree(nnkObjectTy, - newEmptyNode(), - newEmptyNode(), - msgRecordFields) - - result = msgRecord - - if msgKind == msgRequest: - # If the request proc has a default timeout specified, remove it from - # the signature for now so we can generate the `thunk` proc without it. - # The parameter will be added back later only for to the sender proc. - # When the timeout is not specified, we use a default one. - reqTimeout = popTimeoutParam(n) - if reqTimeout == nil: - reqTimeout = newTree(nnkIdentDefs, - ident"timeout", - Int, newLit(defaultTimeout)) - - if n.body.kind != nnkEmpty: - # Implement the receiving thunk proc that deserialzed the - # message parameters and calls the user proc: - userHandlerProc = n.copyNimTree - userHandlerProc.name = genSym(nskProc, msgName) - - # This is the call to the user supplied handler. - # Here we add only the initial params, the rest will be added later. - userHandlerCall = newCall(userHandlerProc.name) - # When there is a user handler, it must be awaited in the thunk proc. - # Above, by default `awaitUserHandler` is set to a no-op statement list. - awaitUserHandler = newCall(await, userHandlerCall) - - var extraDefs: NimNode - if msgKind == msgRequest: - # Request procs need an extra param - the stream where the response - # should be written: - userHandlerProc.params.insert(1, newIdentDefs(stream, P2PStream)) - userHandlerCall.add stream - let peer = userHandlerProc.params[2][0] - extraDefs = quote do: - # Jump through some hoops to work aroung - # https://github.com/nim-lang/Nim/issues/6248 - let `response` = `Response`[`responseRecord`]( - `UntypedResponse`(peer: `peer`, stream: `stream`)) - - # Resolve the Eth2Peer from the LibP2P data received in the thunk - userHandlerCall.add peerIdent - - augmentUserHandler userHandlerProc, msgKind, extraDefs - outRecvProcs.add userHandlerProc - - elif msgName == "status": - awaitUserHandler = quote do: - `await` `handshake`(`peerIdent`, `stream`) - - for param, paramType in n.typedParams(skip = 1): - paramsToWrite.add param - - # Each message has a corresponding record type. - # Here, we create its fields one by one: - msgRecordFields.add newTree(nnkIdentDefs, - newTree(nnkPostfix, ident("*"), param), # The fields are public - chooseFieldType(paramType), # some types such as openarray - # are automatically remapped - newEmptyNode()) - - # If there is user message handler, we'll place a call to it by - # unpacking the fields of the received message: - if userHandlerCall != nil: - userHandlerCall.add quote do: get(`receivedMsg`).`param` # newDotExpr(newCall("get", receivedMsg), param) - - when tracingEnabled: - tracing = quote do: - logReceivedMsg(`stream`.peer, `receivedMsg`.get) - - let requestDataTimeout = newLit(defaultIncomingReqTimeout) - - let thunkName = ident(msgName & "_thunk") - var thunkProc = quote do: - proc `thunkName`(`daemon`: `DaemonAPI`, `stream`: `P2PStream`) {.async, gcsafe.} = - var `receivedMsg` = `await` readMsg(`stream`, `msgRecord`, `requestDataTimeout`) - if `receivedMsg`.isNone: - # TODO: This peer is misbehaving, perhaps we should penalize him somehow + requestDataTimeout = newCall(milliseconds, newLit(defaultIncomingReqTimeout)) + awaitUserHandler = msg.genAwaitUserHandler(msgVar, [peerVar, streamVar]) + + let tracing = when tracingEnabled: + quote: logReceivedMsg(`streamVar`.peer, `msgVar`.get) + else: + newStmtList() + + msg.defineThunk quote do: + proc `thunkName`(`daemonVar`: `DaemonAPI`, + `streamVar`: `P2PStream`) {.async, gcsafe.} = + let + `deadlineVar` = sleepAsync `requestDataTimeout` + `msgBytesVar` = `await` readMsgBytes(`streamVar`, false, `deadlineVar`) + `peerVar` = peerFromStream(`daemonVar`, `streamVar`) + + if `msgBytesVar`.len == 0: + `await` sendErrorResponse(`peerVar`, `streamVar`, ServerError, + "Exceeded read timeout for a request") return - let `peerIdent` = `peerFromStream`(`daemon`, `stream`) - `tracing` - `awaitUserHandler` - `resolveNextMsgFutures`(`peerIdent`, get(`receivedMsg`)) - - for p in userPragmas: - thunkProc.addPragma p - - outRecvProcs.add thunkProc - - outTypes.add quote do: - # This is a type featuring a single field for each message param: - type `msgRecord`* = `msgRecordBody` - - # Add a helper template for accessing the message type: - # e.g. p2p.hello: - template `msgIdent`*(T: type `protoNameIdent`): type = `msgRecord` - template msgId*(T: type `msgRecord`): int = `msgId` - template msgProtocol*(T: type `msgRecord`): type = `protoNameIdent` - - var msgSendProc = n - let msgSendProcName = n.name - outSendProcs.add msgSendProc - - # TODO: check that the first param has the correct type - msgSendProc.params[1][0] = sendTo - if nextId == 0: msgSendProc.params[1][1] = P2PStream - msgSendProc.addPragma ident"gcsafe" - - # Add a timeout parameter for all request procs - case msgKind - of msgRequest: - msgSendProc.params.add reqTimeout - of msgResponse: - # A response proc must be called with a response object that originates - # from a certain request. Here we change the Peer parameter at position - # 1 to the correct strongly-typed ResponseType. The incoming procs still - # gets the normal Peer paramter. - let ResponseType = newTree(nnkBracketExpr, Response, msgRecord) - msgSendProc.params[1][1] = ResponseType - outSendProcs.add quote do: - template send*(r: `ResponseType`, args: varargs[untyped]): auto = - `msgSendProcName`(r, args) - else: discard - - # We change the return type of the sending proc to a Future. - # If this is a request proc, the future will return the response record. - let rt = case msgKind - of msgRequest: newTree(nnkBracketExpr, Option, responseRecord) - of msgResponse, msgNotification: Void - msgSendProc.params[0] = newTree(nnkBracketExpr, ident("Future"), rt) - - let msgBytes = ident"msgBytes" - - # Make the send proc public - msgSendProc.name = newTree(nnkPostfix, ident("*"), msgSendProc.name) - - let initWriter = quote do: - var `outputStream` = init OutputStream - var `writer` = init(WriterType(`Format`), `outputStream`) - var `recordStartMemo` = beginRecord(`writer`, `msgRecord`) - - for param in paramsToWrite: - appendParams.add newCall(writeField, writer, newLit($param), param) - - when tracingEnabled: - appendParams.add logSentMsgFields(msgRecipient, protocol, msgName, paramsToWrite) - - let finalizeRequest = quote do: - endRecord(`writer`, `recordStartMemo`) - let `msgBytes` = `getOutput`(`outputStream`) - - var msgProto = newLit("") - let sendCall = - if msgKind != msgResponse: - msgProto = getRequestProtoName(n) - - when false: - var openStreamProc = n.copyNimTree - var openStreamProc.name = name_openStream - openStreamProc.params.insert 1, newIdentDefs(ident"T", msgRecord) - - if msgKind == msgRequest: - let timeout = reqTimeout[0] - quote: `makeEth2Request`(`msgRecipient`, `msgProto`, `msgBytes`, - `responseRecord`, `timeout`) - elif nextId == 0: - quote: `sendBytes`(`sendTo`, `msgBytes`) - else: - quote: `sendMsg`(`msgRecipient`, `msgProto`, `msgBytes`) - else: - quote: `sendBytes`(`UntypedResponse`(`sendTo`).stream, `msgBytes`) - - msgSendProc.body = quote do: - let `msgRecipient` = `getRecipient`(`sendTo`) - `initWriter` - `appendParams` - `finalizeRequest` - return `sendCall` - - outProcRegistrations.add( - newCall(bindSym("registerMsg"), - protocol, - newLit(msgName), - thunkName, - msgProto, - newTree(nnkBracketExpr, messagePrinter, msgRecord))) - - outTypes.add quote do: - # Create a type acting as a pseudo-object representing the protocol - # (e.g. p2p) - type `protoNameIdent`* = object - - if peerState != nil: - outTypes.add quote do: - template State*(P: type `protoNameIdent`): type = `peerState` - - if networkState != nil: - outTypes.add quote do: - template NetworkState*(P: type `protoNameIdent`): type = `networkState` - - for n in body: - case n.kind - of {nnkCall, nnkCommand}: - if eqIdent(n[0], "nextID"): - discard - elif eqIdent(n[0], "requestResponse"): - # `requestResponse` can be given a block of 2 or more procs. - # The last one is considered to be a response message, while - # all preceeding ones are requests triggering the response. - # The system makes sure to automatically insert a hidden `reqId` - # parameter used to discriminate the individual messages. - block processReqResp: - if n.len == 2 and n[1].kind == nnkStmtList: - var procs = newSeq[NimNode](0) - for def in n[1]: - if def.kind == nnkProcDef: - procs.add(def) - if procs.len > 1: - let responseRecord = addMsgHandler(procs[^1], - msgKind = msgResponse) - for i in 0 .. procs.len - 2: - discard addMsgHandler(procs[i], - msgKind = msgRequest, - responseRecord = responseRecord) - - # we got all the way to here, so everything is fine. - # break the block so it doesn't reach the error call below - break processReqResp - macros.error("requestResponse expects a block with at least two proc definitions") - elif eqIdent(n[0], "onPeerConnected"): - var handshakeProc = liftEventHandler(n[1], "Handshake") - handshakeProc.params.add newIdentDefs(ident"handshakeStream", P2PStream) - handshake = handshakeProc.name - elif eqIdent(n[0], "onPeerDisconnected"): - disconnectHandler = liftEventHandler(n[1], "PeerDisconnect").name - else: - macros.error(repr(n) & " is not a recognized call in P2P protocol definitions", n) - of nnkProcDef: - discard addMsgHandler(n) - of nnkCommentStmt: - discard + var `msgVar`: `msgRecName` + try: + `msgVar` = decode(`Format`, `msgBytesVar`, `msgRecName`) + except SerializationError as `errVar`: + `await` sendErrorResponse(`peerVar`, `streamVar`, `errVar`, + `msgNameLit`, `msgBytesVar`) + return + try: + `tracing` + `awaitUserHandler` + resolveNextMsgFutures(`peerVar`, `msgVar`) + except CatchableError as `errVar`: + `await` sendErrorResponse(`peerVar`, `streamVar`, ServerError, `errVar`.msg) + + ## + ## Implement Senders and Handshake + ## + if msg.kind == msgHandshake: + # In LibP2P protocols, the handshake thunk is special. Instead of directly + # deserializing the incoming message and calling the user-supplied handler, + # we execute the `onPeerConnected` handler instead. + # + # The `onPeerConnected` handler is executed symmetrically for both peers + # and it's expected that one of its very first steps would be to send the + # handshake and then await the same from the other side. We call this step + # "handshakeExchanger". + # + # For the initiating peer, the handshakeExchanger opens a stream and sends + # a regular request through it, but on the receiving side, it just setups + # a future and call the lower-level thunk that will complete it. + # + let + handshake = msg.protocol.onPeerConnected + lowLevelThunkName = $thunkName + + if handshake.isNil: + macros.error "A LibP2P protocol with a handshake must also include an " & + "`onPeerConnected` handler.", msg.procDef + + # We must generate a forward declaration for the `onPeerConnected` handler, + # so we can call it from the thunk proc: + let handshakeProcName = handshake.name + msg.protocol.outRecvProcs.add quote do: + proc `handshakeProcName`(`peerVar`: `Peer`, + `streamVar`: `P2PStream`) {.async, gcsafe.} + + # Here we replace the 'thunkProc' that will be registered as a handler + # for incoming messages: + thunkName = ident(msgName & "_handleConnection") + + msg.protocol.outRecvProcs.add quote do: + proc `thunkName`(`daemonVar`: `DaemonAPI`, + `streamVar`: `P2PStream`) {.async, gcsafe.} = + let `peerVar` = peerFromStream(`daemonVar`, `streamVar`) + try: + `await` `handshakeProcName`(`peerVar`, `streamVar`) + except SerializationError as err: + debug "Failed to decode message", + err = err.formatMsg(""), + msg = `msgNameLit`, + peer = $(`streamVar`.peer) + `await` disconnect(`peerVar`, FaultOrError) + except CatchableError as err: + debug "Failed to complete handshake", err = err.msg + `await` disconnect(`peerVar`, FaultOrError) + + var + handshakeSerializer = msg.createSerializer() + handshakeSerializerName = newLit($handshakeSerializer.name) + handshakeExchanger = msg.createSendProc(nnkMacroDef) + paramsArray = newTree(nnkBracket).appendAllParams(handshakeExchanger.def) + handshakeTypeName = newLit($msg.recIdent) + getAst = ident "getAst" + res = ident "result" + + handshakeExchanger.setBody quote do: + let + stream = ident "stream" + outputStreamVar = ident "outputStream" + lowLevelThunk = ident `lowLevelThunkName` + HandshakeType = ident `handshakeTypeName` + params = `paramsArray` + peer = params[0] + timeout = params[^1] + handshakeSerializationCall = newCall(`bindSymOp` `handshakeSerializerName`, params) + + handshakeSerializationCall[1] = outputStreamVar + handshakeSerializationCall.del(handshakeSerializationCall.len - 1) + + `res` = `getAst`(handshakeImpl(outputStreamVar, handshakeSerializationCall, + lowLevelThunk, HandshakeType, + peer, stream, timeout)) + + when defined(debugMacros) or defined(debugHandshake): + echo "---- Handshake implementation ----" + echo repr(`res`) else: - macros.error("illegal syntax in a P2P protocol definition", n) - - let peerInit = if peerState == nil: newNilLit() - else: newTree(nnkBracketExpr, createPeerState, peerState) - - let netInit = if networkState == nil: newNilLit() - else: newTree(nnkBracketExpr, createNetworkState, networkState) - - result = newNimNode(nnkStmtList) - result.add outTypes - result.add quote do: - # One global variable per protocol holds the protocol run-time data - var p = `initProtocol`(`protoName`, `peerInit`, `netInit`) - var `protocol` = addr p - - # The protocol run-time data is available as a pseudo-field - # (e.g. `p2p.protocolInfo`) - template protocolInfo*(P: type `protoNameIdent`): ProtocolInfo = `protocol` - - result.add outSendProcs, outRecvProcs, outProcRegistrations - result.add quote do: - setEventHandlers(`protocol`, `handshake`, `disconnectHandler`) - - result.add newCall(bindSym("registerProtocol"), protocol) - - when defined(debugP2pProtocol) or defined(debugMacros): - echo repr(result) - -macro p2pProtocol*(protocolOptions: untyped, body: untyped): untyped = - let protoName = $(protocolOptions[0]) - result = protocolOptions - result[0] = bindSym"p2pProtocolImpl" - result.add(newTree(nnkExprEqExpr, - ident("name"), - newLit(protoName))) - result.add(newTree(nnkExprEqExpr, - ident("body"), - body)) - -proc makeMessageHandler[MsgType](msgHandler: proc(msg: MsgType)): P2PPubSubCallback = - result = proc(api: DaemonAPI, ticket: PubsubTicket, msg: PubSubMessage): Future[bool] {.async.} = - msgHandler SSZ.decode(msg.data, MsgType) - return true - -proc subscribe*[MsgType](node: EthereumNode, - topic: string, - msgHandler: proc(msg: MsgType)) {.async.} = - discard await node.daemon.pubsubSubscribe(topic, makeMessageHandler(msgHandler)) - -proc broadcast*(node: Eth2Node, topic: string, msg: auto) {.async.} = - await node.daemon.pubsubPublish(topic, SSZ.encode(msg)) + var sendProc = msg.createSendProc() + implementSendProcBody sendProc + + protocol.outProcRegistrations.add( + newCall(registerMsg, + protocol.protocolInfoVar, + msgNameLit, + thunkName, + getRequestProtoName(msg.procDef), + newTree(nnkBracketExpr, messagePrinter, msgRecName))) + + result.implementProtocolInit = proc (p: P2PProtocol): NimNode = + return newCall(initProtocol, newLit(p.name), p.peerInit, p.netInit) diff --git a/beacon_chain/libp2p_backends_common.nim b/beacon_chain/libp2p_backends_common.nim new file mode 100644 index 0000000000..d88b874f8c --- /dev/null +++ b/beacon_chain/libp2p_backends_common.nim @@ -0,0 +1,74 @@ +# included from libp2p_backend + +template `$`*(peer: Peer): string = $peer.id + +chronicles.formatIt(Peer): $it + +proc init*(T: type Peer, network: Eth2Node, id: PeerID): Peer {.gcsafe.} + +proc getPeer*(node: Eth2Node, peerId: PeerID): Peer {.gcsafe.} = + result = node.peers.getOrDefault(peerId) + if result == nil: + result = Peer.init(node, peerId) + node.peers[peerId] = result + +proc peerFromStream(daemon: DaemonAPI, stream: P2PStream): Peer {.gcsafe.} = + Eth2Node(daemon.userData).getPeer(stream.peer) + +proc disconnect*(peer: Peer, reason: DisconnectionReason, notifyOtherPeer = false) {.async.} = + # TODO: How should we notify the other peer? + if peer.connectionState notin {Disconnecting, Disconnected}: + peer.connectionState = Disconnecting + await peer.network.daemon.disconnect(peer.id) + peer.connectionState = Disconnected + peer.network.peers.del(peer.id) + +template raisePeerDisconnected(msg: string, r: DisconnectionReason) = + var e = newException(PeerDisconnected, msg) + e.reason = r + raise e + +proc disconnectAndRaise(peer: Peer, + reason: DisconnectionReason, + msg: string) {.async.} = + let r = reason + await peer.disconnect(r) + raisePeerDisconnected(msg, r) + +template reraiseAsPeerDisconnected(peer: Peer, errMsgExpr: static string, + reason = FaultOrError): auto = + const errMsg = errMsgExpr + debug errMsg, err = getCurrentExceptionMsg() + disconnectAndRaise(peer, reason, errMsg) + +proc getCompressedMsgId*(MsgType: type): CompressedMsgId = + mixin msgId, msgProtocol, protocolInfo + (protocolIdx: MsgType.msgProtocol.protocolInfo.index, + methodId: MsgType.msgId) + +proc nextMsg*(peer: Peer, MsgType: type): Future[MsgType] = + ## This procs awaits a specific P2P message. + ## Any messages received while waiting will be dispatched to their + ## respective handlers. The designated message handler will also run + ## to completion before the future returned by `nextMsg` is resolved. + let awaitedMsgId = getCompressedMsgId(MsgType) + let f = getOrDefault(peer.awaitedMessages, awaitedMsgId) + if not f.isNil: + return Future[MsgType](f) + + initFuture result + peer.awaitedMessages[awaitedMsgId] = result + +proc registerProtocol(protocol: ProtocolInfo) = + # TODO: This can be done at compile-time in the future + let pos = lowerBound(gProtocols, protocol) + gProtocols.insert(protocol, pos) + for i in 0 ..< gProtocols.len: + gProtocols[i].index = i + +proc setEventHandlers(p: ProtocolInfo, + handshake: HandshakeStep, + disconnectHandler: DisconnectionHandler) = + p.handshake = handshake + p.disconnectHandler = disconnectHandler + diff --git a/beacon_chain/libp2p_spec_backend.nim b/beacon_chain/libp2p_spec_backend.nim new file mode 100644 index 0000000000..98084114e1 --- /dev/null +++ b/beacon_chain/libp2p_spec_backend.nim @@ -0,0 +1,645 @@ +import + tables, deques, options, algorithm, std_shims/[macros_shim, tables_shims], + ranges/ptr_arith, chronos, chronicles, serialization, faststreams/input_stream, + eth/async_utils, eth/p2p/p2p_protocol_dsl, libp2p/daemon/daemonapi, + ssz + +export + daemonapi, p2pProtocol, serialization, ssz + +const + # Compression nibble + NoCompression* = byte 0 + + # Encoding nibble + SszEncoding* = byte 1 + + beaconChainProtocol = "/eth/serenity/beacon/rpc/1" + +type + Eth2Node* = ref object of RootObj + daemon*: DaemonAPI + peers*: Table[PeerID, Peer] + protocolStates*: seq[RootRef] + + EthereumNode = Eth2Node # needed for the definitions in p2p_backends_helpers + + Peer* = ref object + network*: Eth2Node + id*: PeerID + lastReqId*: uint64 + rpcStream*: P2PStream + connectionState*: ConnectionState + awaitedMessages: Table[CompressedMsgId, FutureBase] + outstandingRequests*: Table[uint64, OutstandingRequest] + protocolStates*: seq[RootRef] + maxInactivityAllowed*: Duration + + ConnectionState* = enum + None, + Connecting, + Connected, + Disconnecting, + Disconnected + + DisconnectionReason* = enum + ClientShutdown = 1 + IrrelevantNetwork + FaultOrError + + CompressedMsgId = tuple + protocolIdx, methodId: int + + ResponderWithId*[MsgType] = object + peer*: Peer + reqId*: uint64 + + Response*[MsgType] = distinct Peer + + # ----------------------------------------- + + ResponseCode* = enum + NoError + ParseError = 10 + InvalidRequest = 20 + MethodNotFound = 30 + ServerError = 40 + + OutstandingRequest* = object + id*: uint64 + future*: FutureBase + timeoutAt*: Moment + responseThunk*: ThunkProc + + ProtocolConnection* = object + stream*: P2PStream + protocolInfo*: ProtocolInfo + + MessageInfo* = object + id*: int + name*: string + + # Private fields: + thunk*: ThunkProc + printer*: MessageContentPrinter + nextMsgResolver*: NextMsgResolver + requestResolver*: RequestResolver + + ProtocolInfoObj* = object + name*: string + version*: int + messages*: seq[MessageInfo] + index*: int # the position of the protocol in the + # ordered list of supported protocols + + # Private fields: + peerStateInitializer*: PeerStateInitializer + networkStateInitializer*: NetworkStateInitializer + handshake*: HandshakeStep + disconnectHandler*: DisconnectionHandler + + ProtocolInfo* = ptr ProtocolInfoObj + + SpecOuterMsgHeader {.packed.} = object + compression {.bitsize: 4.}: uint + encoding {.bitsize: 4.}: uint + msgLen: uint64 + + SpecInnerMsgHeader {.packed.} = object + reqId: uint64 + methodId: uint16 + + ErrorResponse {.packed.} = object + outerHeader: SpecOuterMsgHeader + innerHeader: SpecInnerMsgHeader + + PeerStateInitializer* = proc(peer: Peer): RootRef {.gcsafe.} + NetworkStateInitializer* = proc(network: Eth2Node): RootRef {.gcsafe.} + + HandshakeStep* = proc(peer: Peer, handshakeStream: P2PStream): Future[void] {.gcsafe.} + DisconnectionHandler* = proc(peer: Peer): Future[void] {.gcsafe.} + + ThunkProc* = proc(peer: Peer, + stream: P2PStream, + reqId: uint64, + reqFuture: FutureBase, + msgData: ByteStreamVar): Future[void] {.gcsafe.} + + MessageContentPrinter* = proc(msg: pointer): string {.gcsafe.} + NextMsgResolver* = proc(msg: pointer, future: FutureBase) {.gcsafe.} + RequestResolver* = proc(msg: pointer, future: FutureBase) {.gcsafe.} + + Bytes = seq[byte] + + InvalidMsgIdError = object of InvalidMsgError + + PeerDisconnected* = object of P2PBackendError + reason*: DisconnectionReason + + PeerLoopExitReason = enum + Success + UnsupportedCompression + UnsupportedEncoding + ProtocolViolation + InactivePeer + InternalError + +const + HandshakeTimeout = FaultOrError + BreachOfProtocol* = FaultOrError + # TODO: We should lobby for more disconnection reasons. + +template isOdd(val: SomeInteger): bool = + type T = type(val) + (val and T(1)) != 0 + +proc init(T: type SpecOuterMsgHeader, + compression, encoding: byte, msgLen: uint64): T = + T(compression: compression, encoding: encoding, msgLen: msgLen) + +proc readPackedObject(stream: P2PStream, T: type): Future[T] {.async.} = + await stream.transp.readExactly(addr result, sizeof result) + +proc appendPackedObject(stream: OutputStreamVar, value: auto) = + let valueAsBytes = cast[ptr byte](unsafeAddr(value)) + stream.append makeOpenArray(valueAsBytes, sizeof(value)) + +proc getThunk(protocol: ProtocolInfo, methodId: uint16): ThunkProc = + if methodId.int >= protocol.messages.len: return nil + protocol.messages[methodId.int].thunk + +include eth/p2p/p2p_backends_helpers +include eth/p2p/p2p_tracing +include libp2p_backends_common + +proc handleConnectingBeaconChainPeer(daemon: DaemonAPI, stream: P2PStream) {.async, gcsafe.} + +proc init*(T: type Eth2Node, daemon: DaemonAPI): Future[Eth2Node] {.async.} = + new result + result.daemon = daemon + result.daemon.userData = result + result.peers = initTable[PeerID, Peer]() + + newSeq result.protocolStates, allProtocols.len + for proto in allProtocols: + if proto.networkStateInitializer != nil: + result.protocolStates[proto.index] = proto.networkStateInitializer(result) + + await daemon.addHandler(@[beaconChainProtocol], handleConnectingBeaconChainPeer) + +proc init*(T: type Peer, network: Eth2Node, id: PeerID): Peer = + new result + result.id = id + result.network = network + result.awaitedMessages = initTable[CompressedMsgId, FutureBase]() + result.maxInactivityAllowed = 15.minutes # TODO: read this from the config + result.connectionState = None + newSeq result.protocolStates, allProtocols.len + for i in 0 ..< allProtocols.len: + let proto = allProtocols[i] + if proto.peerStateInitializer != nil: + result.protocolStates[i] = proto.peerStateInitializer(result) + +proc init*[MsgName](T: type ResponderWithId[MsgName], + peer: Peer, reqId: uint64): T = + T(peer: peer, reqId: reqId) + +proc sendMsg*(peer: Peer, data: Bytes) {.gcsafe, async.} = + try: + var unsentBytes = data.len + while true: + # TODO: this looks wrong. + # We are always trying to write the same data. + # Find all other places where such code is used. + unsentBytes -= await peer.rpcStream.transp.write(data) + if unsentBytes <= 0: return + except CatchableError: + await peer.disconnect(FaultOrError) + # this is usually a "(32) Broken pipe": + # FIXME: this exception should be caught somewhere in addMsgHandler() and + # sending should be retried a few times + raise + +proc sendMsg*[T](responder: ResponderWithId[T], data: Bytes): Future[void] = + return sendMsg(responder.peer, data) + +proc sendErrorResponse(peer: Peer, reqId: uint64, + responseCode: ResponseCode): Future[void] = + var resp = ErrorResponse( + outerHeader: SpecOuterMsgHeader.init( + compression = NoCompression, + encoding = SszEncoding, + msgLen = uint64 sizeof(SpecInnerMsgHeader)), + innerHeader: SpecInnerMsgHeader( + reqId: reqId, + methodId: uint16(responseCode))) + + # TODO: don't allocate the Bytes sequence here + return peer.sendMsg @(makeOpenArray(cast[ptr byte](addr resp), sizeof resp)) + +proc recvAndDispatchMsg*(peer: Peer): Future[PeerLoopExitReason] {.async.} = + template fail(reason) = + return reason + + # For now, we won't try to handle the presence of multiple sub-protocols + # since the spec is not defining how they will be mapped to P2P streams. + doAssert allProtocols.len == 1 + + var + stream = peer.rpcStream + protocol = allProtocols[0] + + var outerHeader = await stream.readPackedObject(SpecOuterMsgHeader) + + if outerHeader.compression != NoCompression: + fail UnsupportedCompression + + if outerHeader.encoding != SszEncoding: + fail UnsupportedEncoding + + if outerHeader.msgLen <= SpecInnerMsgHeader.sizeof.uint64: + fail ProtocolViolation + + let + innerHeader = await stream.readPackedObject(SpecInnerMsgHeader) + reqId = innerHeader.reqId + + var msgContent = newSeq[byte](outerHeader.msgLen - SpecInnerMsgHeader.sizeof.uint64) + await stream.transp.readExactly(addr msgContent[0], msgContent.len) + + var msgContentStream = memoryStream(msgContent) + + if reqId.isOdd: + peer.outstandingRequests.withValue(reqId, req): + let thunk = req.responseThunk + let reqFuture = req.future + peer.outstandingRequests.del(reqId) + + try: + await thunk(peer, stream, reqId, reqFuture, msgContentStream) + except SerializationError: + debug "Error during deserialization", err = getCurrentExceptionMsg() + fail ProtocolViolation + except CatchableError: + # TODO + warn "" + do: + debug "Ignoring late or invalid response ID", peer, id = reqId + # TODO: skip the message + else: + let thunk = protocol.getThunk(innerHeader.methodId) + if thunk != nil: + try: + await thunk(peer, stream, reqId, nil, msgContentStream) + except SerializationError: + debug "Error during deserialization", err = getCurrentExceptionMsg() + fail ProtocolViolation + except CatchableError: + # TODO + warn "" + else: + debug "P2P request method not found", methodId = innerHeader.methodId + await peer.sendErrorResponse(reqId, MethodNotFound) + +proc dispatchMessages*(peer: Peer): Future[PeerLoopExitReason] {.async.} = + while true: + let dispatchedMsgFut = recvAndDispatchMsg(peer) + doAssert peer.maxInactivityAllowed.milliseconds > 0 + yield dispatchedMsgFut or sleepAsync(peer.maxInactivityAllowed) + if not dispatchedMsgFut.finished: + return InactivePeer + elif dispatchedMsgFut.failed: + error "Error in peer loop" + return InternalError + else: + let status = dispatchedMsgFut.read + if status == Success: continue + return status + +proc performProtocolHandshakes*(peer: Peer) {.async.} = + peer.initProtocolStates allProtocols + + # Please note that the ordering of operations here is important! + # + # We must first start all handshake procedures and give them a + # chance to send any initial packages they might require over + # the network and to yield on their `nextMsg` waits. + # + var subProtocolsHandshakes = newSeqOfCap[Future[void]](allProtocols.len) + for protocol in allProtocols: + if protocol.handshake != nil: + subProtocolsHandshakes.add((protocol.handshake)(peer, peer.rpcStream)) + + # The `dispatchMesssages` loop must be started after this. + # Otherwise, we risk that some of the handshake packets sent by + # the other peer may arrrive too early and be processed before + # the handshake code got a change to wait for them. + # + var messageProcessingLoop = peer.dispatchMessages() + messageProcessingLoop.callback = proc(p: pointer) {.gcsafe.} = + if messageProcessingLoop.failed: + debug "Ending dispatchMessages loop", peer, + err = messageProcessingLoop.error.msg + else: + debug "Ending dispatchMessages", peer, + exitCode = messageProcessingLoop.read + traceAsyncErrors peer.disconnect(ClientShutdown) + + # The handshake may involve multiple async steps, so we wait + # here for all of them to finish. + # + await all(subProtocolsHandshakes) + + peer.connectionState = Connected + debug "Peer connection initialized", peer + +proc initializeConnection*(peer: Peer) {.async.} = + let daemon = peer.network.daemon + try: + peer.connectionState = Connecting + peer.rpcStream = await daemon.openStream(peer.id, @[beaconChainProtocol]) + await performProtocolHandshakes(peer) + except CatchableError: + await reraiseAsPeerDisconnected(peer, "Failed to perform handshake") + +proc handleConnectingBeaconChainPeer(daemon: DaemonAPI, stream: P2PStream) {.async, gcsafe.} = + let peer = daemon.peerFromStream(stream) + peer.rpcStream = stream + peer.connectionState = Connecting + await performProtocolHandshakes(peer) + +proc resolvePendingFutures(peer: Peer, protocol: ProtocolInfo, + methodId: int, msg: pointer, reqFuture: FutureBase) = + + let msgId = (protocolIdx: protocol.index, methodId: methodId) + + if peer.awaitedMessages[msgId] != nil: + let msgInfo = protocol.messages[methodId] + msgInfo.nextMsgResolver(msg, peer.awaitedMessages[msgId]) + peer.awaitedMessages[msgId] = nil + + if reqFuture != nil and not reqFuture.finished: + protocol.messages[methodId].requestResolver(msg, reqFuture) + +proc initProtocol(name: string, version: int, + peerInit: PeerStateInitializer, + networkInit: NetworkStateInitializer): ProtocolInfoObj = + result.name = name + result.version = version + result.messages = @[] + result.peerStateInitializer = peerInit + result.networkStateInitializer = networkInit + +proc registerMsg(protocol: ProtocolInfo, + id: int, name: string, + thunk: ThunkProc, + printer: MessageContentPrinter, + requestResolver: RequestResolver, + nextMsgResolver: NextMsgResolver) = + if protocol.messages.len <= id: + protocol.messages.setLen(id + 1) + protocol.messages[id] = MessageInfo(id: id, + name: name, + thunk: thunk, + printer: printer, + requestResolver: requestResolver, + nextMsgResolver: nextMsgResolver) + +template applyDecorator(p: NimNode, decorator: NimNode) = + if decorator.kind != nnkNilLit: p.addPragma decorator + +proc prepareRequest(peer: Peer, + protocol: ProtocolInfo, + requestMethodId, responseMethodId: uint16, + stream: OutputStreamVar, + timeout: Duration, + responseFuture: FutureBase): DelayedWriteCursor = + assert peer != nil and + protocol != nil and + responseFuture != nil and + responseMethodId.int < protocol.messages.len + + doAssert timeout.milliseconds > 0 + + result = stream.delayFixedSizeWrite sizeof(SpecOuterMsgHeader) + + inc peer.lastReqId, 2 + let reqId = peer.lastReqId + + stream.appendPackedObject SpecInnerMsgHeader( + reqId: reqId, methodId: requestMethodId) + + template responseMsgInfo: auto = + protocol.messages[responseMethodId.int] + + let + requestResolver = responseMsgInfo.requestResolver + timeoutAt = Moment.fromNow(timeout) + + peer.outstandingRequests[reqId + 1] = OutstandingRequest( + id: reqId, + future: responseFuture, + timeoutAt: timeoutAt, + responseThunk: responseMsgInfo.thunk) + + proc timeoutExpired(udata: pointer) = + requestResolver(nil, responseFuture) + peer.outstandingRequests.del(reqId + 1) + + addTimer(timeoutAt, timeoutExpired, nil) + +proc prepareResponse(responder: ResponderWithId, + stream: OutputStreamVar): DelayedWriteCursor = + result = stream.delayFixedSizeWrite sizeof(SpecOuterMsgHeader) + + stream.appendPackedObject SpecInnerMsgHeader( + reqId: responder.reqId + 1, + methodId: uint16(Success)) + +proc prepareMsg(peer: Peer, methodId: uint16, + stream: OutputStreamVar): DelayedWriteCursor = + result = stream.delayFixedSizeWrite sizeof(SpecOuterMsgHeader) + + inc peer.lastReqId, 2 + stream.appendPackedObject SpecInnerMsgHeader( + reqId: peer.lastReqId, methodId: methodId) + +proc finishOuterHeader(headerCursor: DelayedWriteCursor) = + var outerHeader = SpecOuterMsgHeader.init( + compression = NoCompression, + encoding = SszEncoding, + msgLen = uint64(headerCursor.totalBytesWrittenAfterCursor)) + + headerCursor.endWrite makeOpenArray(cast[ptr byte](addr outerHeader), + sizeof outerHeader) + +proc implementSendProcBody(sendProc: SendProc) = + let + msg = sendProc.msg + delayedWriteCursor = ident "delayedWriteCursor" + peer = sendProc.peerParam + + proc preSerializationStep(stream: NimNode): NimNode = + case msg.kind + of msgRequest: + let + requestMethodId = newLit(msg.id) + responseMethodId = newLit(msg.response.id) + protocol = sendProc.msg.protocol.protocolInfoVar + timeout = sendProc.timeoutParam + + quote do: + var `delayedWriteCursor` = prepareRequest( + `peer`, `protocol`, `requestMethodId`, `responseMethodId`, + `stream`, `timeout`, `resultIdent`) + + of msgResponse: + quote do: + var `delayedWriteCursor` = prepareResponse(`peer`, `stream`) + + of msgHandshake, msgNotification: + let methodId = newLit(msg.id) + quote do: + var `delayedWriteCursor` = prepareMsg(`peer`, `methodId`, `stream`) + + proc postSerializationStep(stream: NimNode): NimNode = + newCall(bindSym "finishOuterHeader", delayedWriteCursor) + + proc sendCallGenerator(peer, bytes: NimNode): NimNode = + let + linkSendFailureToReqFuture = bindSym "linkSendFailureToReqFuture" + sendMsg = bindSym "sendMsg" + sendCall = newCall(sendMsg, peer, bytes) + + if msg.kind == msgRequest: + # In RLPx requests, the returned future was allocated here and passed + # to `prepareRequest`. It's already assigned to the result variable + # of the proc, so we just wait for the sending operation to complete + # and we return in a normal way. (the waiting is done, so we can catch + # any possible errors). + quote: `linkSendFailureToReqFuture`(`sendCall`, `resultIdent`) + else: + # In normal RLPx messages, we are returning the future returned by the + # `sendMsg` call. + quote: return `sendCall` + + sendProc.useStandardBody( + preSerializationStep, + postSerializationStep, + sendCallGenerator) + +proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend = + let + Option = bindSym "Option" + Peer = bindSym "Peer" + EthereumNode = bindSym "EthereumNode" + + Format = ident "SSZ" + Response = bindSym "Response" + ResponderWithId = bindSym "ResponderWithId" + perProtocolMsgId = ident "perProtocolMsgId" + + mount = bindSym "mount" + + messagePrinter = bindSym "messagePrinter" + resolveFuture = bindSym "resolveFuture" + requestResolver = bindSym "requestResolver" + resolvePendingFutures = bindSym "resolvePendingFutures" + nextMsg = bindSym "nextMsg" + initProtocol = bindSym "initProtocol" + registerMsg = bindSym "registerMsg" + handshakeImpl = bindSym "handshakeImpl" + + stream = ident "stream" + protocol = ident "protocol" + response = ident "response" + reqFutureVar = ident "reqFuture" + msgContents = ident "msgContents" + receivedMsg = ident "receivedMsg" + + ProtocolInfo = bindSym "ProtocolInfo" + P2PStream = bindSym "P2PStream" + ByteStreamVar = bindSym "ByteStreamVar" + + new result + + result.registerProtocol = bindSym "registerProtocol" + result.setEventHandlers = bindSym "setEventHandlers" + result.PeerType = Peer + result.NetworkType = EthereumNode + result.SerializationFormat = Format + + p.useRequestIds = true + result.ReqIdType = ident "uint64" + result.ResponderType = ResponderWithId + + result.afterProtocolInit = proc (p: P2PProtocol) = + p.onPeerConnected.params.add newIdentDefs(ident"handshakeStream", P2PStream) + + result.implementMsg = proc (msg: Message) = + var + msgIdLit = newLit(msg.id) + msgRecName = msg.recIdent + msgIdent = msg.ident + msgName = $msgIdent + protocol = msg.protocol + + ## + ## Implemenmt Thunk + ## + let traceMsg = when tracingEnabled: + newCall(bindSym"logReceivedMsg", peer, receivedMsg) + else: + newStmtList() + + let callResolvePendingFutures = newCall( + resolvePendingFutures, peerVar, + protocol.protocolInfoVar, + msgIdLit, + newCall("addr", receivedMsg), + reqFutureVar) + + var userHandlerParams = @[peerVar] + if msg.kind == msgRequest: + userHandlerParams.add reqIdVar + + let + thunkName = ident(msgName & "_thunk") + awaitUserHandler = msg.genAwaitUserHandler(receivedMsg, userHandlerParams) + + msg.defineThunk quote do: + proc `thunkName`(`peerVar`: `Peer`, + `stream`: `P2PStream`, + `reqIdVar`: uint64, + `reqFutureVar`: FutureBase, + `msgContents`: `ByteStreamVar`) {.async, gcsafe.} = + var `receivedMsg` = `mount`(`Format`, `msgContents`, `msgRecName`) + `traceMsg` + `awaitUserHandler` + `callResolvePendingFutures` + + ## + ## Implement Senders and Handshake + ## + var sendProc = msg.createSendProc(isRawSender = (msg.kind == msgHandshake)) + implementSendProcBody sendProc + + if msg.kind == msgHandshake: + discard msg.createHandshakeTemplate(sendProc.def.name, handshakeImpl, nextMsg) + + protocol.outProcRegistrations.add( + newCall(registerMsg, + protocol.protocolInfoVar, + msgIdLit, + newLit(msgName), + thunkName, + newTree(nnkBracketExpr, messagePrinter, msgRecName), + newTree(nnkBracketExpr, requestResolver, msgRecName), + newTree(nnkBracketExpr, resolveFuture, msgRecName))) + + result.implementProtocolInit = proc (protocol: P2PProtocol): NimNode = + return newCall(initProtocol, + newLit(protocol.shortName), + newLit(protocol.version), + protocol.peerInit, protocol.netInit) + diff --git a/beacon_chain/request_manager.nim b/beacon_chain/request_manager.nim index c4ffa91200..8c12044e2e 100644 --- a/beacon_chain/request_manager.nim +++ b/beacon_chain/request_manager.nim @@ -5,7 +5,7 @@ import eth2_network, beacon_node_types, sync_protocol, eth/async_utils -proc init*(T: type RequestManager, network: EthereumNode): T = +proc init*(T: type RequestManager, network: Eth2Node): T = T(network: network) type diff --git a/beacon_chain/ssz.nim b/beacon_chain/ssz.nim index 2d49330ee7..3fd2aa1b77 100644 --- a/beacon_chain/ssz.nim +++ b/beacon_chain/ssz.nim @@ -40,6 +40,11 @@ serializationFormat SSZ, proc init*(T: type SszReader, stream: ByteStreamVar): T = result.stream = stream +proc mount*(F: type SSZ, stream: ByteStreamVar, T: type): T = + mixin readValue + var reader = init(SszReader, stream) + reader.readValue(T) + func toSSZType(x: Slot|Epoch): auto = x.uint64 func toSSZType(x: auto): auto = x @@ -167,11 +172,13 @@ proc writeValue*(w: var SszWriter, obj: auto) = when obj is ValidatorIndex|BasicType: w.stream.append obj.toSSZType().toBytesSSZ + elif obj is byte|char: + w.stream.append obj elif obj is enum: w.stream.append uint64(obj).toBytesSSZ else: let memo = w.beginRecord(obj.type) - when obj is seq|array|openarray: + when obj is seq|array|openarray|string: # If you get an error here that looks like: # type mismatch: got # you just used an unsigned int for an array index thinking you'd get diff --git a/beacon_chain/sync_protocol.nim b/beacon_chain/sync_protocol.nim index 1beea1da78..e66e616807 100644 --- a/beacon_chain/sync_protocol.nim +++ b/beacon_chain/sync_protocol.nim @@ -1,13 +1,9 @@ import - options, tables, sequtils, algorithm, + options, tables, sequtils, algorithm, sets, macros, chronicles, chronos, ranges/bitranges, spec/[datatypes, crypto, digest, helpers], eth/rlp, beacon_node_types, eth2_network, beacon_chain_db, block_pool, time, ssz -from beacon_node import onBeaconBlock - # Careful handling of beacon_node <-> sync_protocol - # to avoid recursive dependencies - type ValidatorChangeLogEntry* = object case kind*: ValidatorSetDeltaFlags @@ -19,7 +15,8 @@ type ValidatorSet = seq[Validator] BeaconSyncState* = ref object - networkId*: uint64 + networkId*: uint8 + chainId*: uint64 node*: BeaconNode db*: BeaconChainDB @@ -48,7 +45,7 @@ proc fromHeaderAndBody(b: var BeaconBlock, h: BeaconBlockHeader, body: BeaconBlo proc importBlocks(node: BeaconNode, blocks: openarray[BeaconBlock]) = for blk in blocks: - node.onBeaconBlock(blk) + node.onBeaconBlock(node, blk) info "Forward sync imported blocks", len = blocks.len proc mergeBlockHeadersAndBodies(headers: openarray[BeaconBlockHeader], bodies: openarray[BeaconBlockBody]): Option[seq[BeaconBlock]] = @@ -73,11 +70,12 @@ p2pProtocol BeaconSync(version = 1, shortName = "bcs", networkState = BeaconSyncState): - onPeerConnected do(peer: Peer): + onPeerConnected do (peer: Peer): let protocolVersion = 1 # TODO: Spec doesn't specify this yet node = peer.networkState.node networkId = peer.networkState.networkId + chainId = peer.networkState.networkId blockPool = node.blockPool finalizedHead = blockPool.finalizedHead headBlock = blockPool.head.blck @@ -85,12 +83,12 @@ p2pProtocol BeaconSync(version = 1, bestSlot = headBlock.slot latestFinalizedEpoch = finalizedHead.slot.slot_to_epoch() - let m = await handshake(peer, timeout = 10.seconds, - status(networkId, finalizedHead.blck.root, - latestFinalizedEpoch, bestRoot, bestSlot)) + let m = await peer.hello(networkId, chainId, finalizedHead.blck.root, + latestFinalizedEpoch, bestRoot, bestSlot, + timeout = 10.seconds) if m.networkId != networkId: - await peer.disconnect(UselessPeer) + await peer.disconnect(IrrelevantNetwork) return # TODO: onPeerConnected runs unconditionally for every connected peer, but we @@ -103,7 +101,7 @@ p2pProtocol BeaconSync(version = 1, let bestDiff = cmp((latestFinalizedEpoch, bestSlot), (m.latestFinalizedEpoch, m.bestSlot)) if bestDiff >= 0: # Nothing to do? - trace "Nothing to sync", peer = peer.remote + debug "Nothing to sync", peer else: # TODO: Check for WEAK_SUBJECTIVITY_PERIOD difference and terminate the # connection if it's too big. @@ -122,7 +120,7 @@ p2pProtocol BeaconSync(version = 1, if lastSlot <= s: info "Slot did not advance during sync", peer break - + s = lastSlot + 1 else: break @@ -130,16 +128,38 @@ p2pProtocol BeaconSync(version = 1, except CatchableError: warn "Failed to sync with peer", peer, err = getCurrentExceptionMsg() - proc status( + handshake: + proc hello( peer: Peer, - networkId: uint64, + networkId: uint8, + chainId: uint64, latestFinalizedRoot: Eth2Digest, latestFinalizedEpoch: Epoch, bestRoot: Eth2Digest, - bestSlot: Slot) {.libp2pProtocol("hello", "1.0.0").} + bestSlot: Slot) + + proc goodbye(peer: Peer, reason: DisconnectionReason) + + requestResponse: + proc getStatus( + peer: Peer, + sha: Eth2Digest, + userAgent: string, + timestamp: uint64) = + + # TODO: How should this be implemented? + # https://github.com/ethereum/eth2.0-specs/blob/dev/specs/networking/rpc-interface.md#get-status + await response.send(sha, userAgent, timestamp) + + proc status(peer: Peer, sha: Eth2Digest, userAgent: string, timestamp: uint64) + + nextId 10 requestResponse: - proc getBeaconBlockRoots(peer: Peer, fromSlot: Slot, maxRoots: int) = + proc getBeaconBlockRoots( + peer: Peer, + fromSlot: Slot, + maxRoots: int) = let maxRoots = min(MaxRootsToRequest, maxRoots) var s = fromSlot var roots = newSeqOfCap[(Eth2Digest, Slot)](maxRoots) @@ -161,7 +181,7 @@ p2pProtocol BeaconSync(version = 1, slot: Slot, maxHeaders: int, skipSlots: int, - backward: uint8) {.libp2pProtocol("rpc/beacon_block_headers", "1.0.0").} = + backward: uint8) = let maxHeaders = min(MaxHeadersToRequest, maxHeaders) var headers: seq[BeaconBlockHeader] let db = peer.networkState.db @@ -206,6 +226,8 @@ p2pProtocol BeaconSync(version = 1, proc beaconBlockHeaders(peer: Peer, blockHeaders: openarray[BeaconBlockHeader]) + # TODO move this at the bottom, because it's not in the spec yet, but it will + # consume a `method_id` requestResponse: proc getAncestorBlocks( peer: Peer, @@ -244,7 +266,7 @@ p2pProtocol BeaconSync(version = 1, requestResponse: proc getBeaconBlockBodies( peer: Peer, - blockRoots: openarray[Eth2Digest]) {.libp2pProtocol("rpc/beacon_block_bodies", "1.0.0").} = + blockRoots: openarray[Eth2Digest]) = # TODO: Validate blockRoots.len var bodies = newSeqOfCap[BeaconBlockBody](blockRoots.len) let db = peer.networkState.db diff --git a/beacon_chain/version.nim b/beacon_chain/version.nim index 146afd6263..dc97dc6730 100644 --- a/beacon_chain/version.nim +++ b/beacon_chain/version.nim @@ -1,5 +1,16 @@ +type + NetworkBackendType* = enum + libp2pSpecBackend + libp2pNativeBackend + rlpxBackend + const - useRLPx* = not defined(withLibP2P) + network_type {.strdefine.} = "libp2p_native" + + networkBackend* = when network_type == "rlpx": rlpxBackend + elif network_type == "libp2p_spec": libp2pSpecBackend + elif network_type == "libp2p_native": libp2pNativeBackend + else: {.fatal: "The 'network_type' should be one of 'libp2p_spec', 'libp2p_native' or 'rlpx'" .} const versionMajor* = 0 @@ -15,4 +26,5 @@ template versionAsStr*: string = $versionMajor & "." & $versionMinor & "." & $versionBuild proc fullVersionStr*: string = - versionAsStr & (if useRLPx: " rlpx" else: " libp2p") + versionAsStr & "_" & network_type + diff --git a/scripts/build_testnet_node.sh b/scripts/build_testnet_node.sh index 81c8898b15..a3b2a84c2a 100755 --- a/scripts/build_testnet_node.sh +++ b/scripts/build_testnet_node.sh @@ -12,7 +12,7 @@ source "$NETWORK_NAME.env" cd .. -NIM_FLAGS="-d:release --lineTrace:on -d:chronicles_log_level=DEBUG -d:SECONDS_PER_SLOT=$SECONDS_PER_SLOT -d:SHARD_COUNT=$SHARD_COUNT -d:SLOTS_PER_EPOCH=$SLOTS_PER_EPOCH -d:DEFAULT_NETWORK=$NETWORK_NAME --hints:off --verbosity:0" +NIM_FLAGS="-d:release --lineTrace:on -d:chronicles_log_level=DEBUG -d:network_type=$NETWORK_TYPE -d:SECONDS_PER_SLOT=$SECONDS_PER_SLOT -d:SHARD_COUNT=$SHARD_COUNT -d:SLOTS_PER_EPOCH=$SLOTS_PER_EPOCH -d:DEFAULT_NETWORK=$NETWORK_NAME --hints:off --verbosity:0" BEACON_NODE_BIN="build/${NETWORK_NAME}_node" @@ -29,7 +29,7 @@ echo "Done! You're now ready to connect to $NETWORK_NAME by running:" echo echo " $BEACON_NODE_BIN" echo -echo "Database and configuration files placed in:" +echo "Database and configuration files will be placed in:" echo echo " ${HOME}/.cache/nimbus/BeaconNode/${NETWORK_NAME}" echo diff --git a/scripts/reset_testnet.sh b/scripts/reset_testnet.sh index 1b61789f3d..781ab85deb 100755 --- a/scripts/reset_testnet.sh +++ b/scripts/reset_testnet.sh @@ -19,7 +19,7 @@ NETWORK_DIR=$WWW_DIR/$NETWORK_NAME regenTestnetFiles() { NIM_FLAGS="-d:release -d:SECONDS_PER_SLOT=$SECONDS_PER_SLOT -d:SHARD_COUNT=$SHARD_COUNT -d:SLOTS_PER_EPOCH=$SLOTS_PER_EPOCH ${2:-}" - NETWORK_FLAVOUR=$1 + NETWORK_TYPE=$1 if [ ! -f $NETWORK_DIR/genesis.json ]; then rm -f $NETWORK_DIR/* @@ -28,7 +28,7 @@ regenTestnetFiles() { --outputDir="$NETWORK_DIR" fi - nim c -r $NIM_FLAGS beacon_chain/beacon_node \ + nim c -d:"network_type=$NETWORK_TYPE" -r $NIM_FLAGS beacon_chain/beacon_node \ --network=$NETWORK_NAME \ --dataDir=$DATA_DIR/node-0 \ createTestnet \ @@ -37,12 +37,13 @@ regenTestnetFiles() { --totalValidators=$VALIDATOR_COUNT \ --lastUserValidator=$LAST_USER_VALIDATOR \ --outputGenesis=$NETWORK_DIR/genesis.json \ - --outputNetwork=$NETWORK_DIR/$NETWORK_FLAVOUR-network.json \ + --outputNetwork=$NETWORK_DIR/$NETWORK_TYPE-network.json \ --bootstrapAddress=$PUBLIC_IP \ --bootstrapPort=$BOOTSTRAP_PORT \ --genesisOffset=600 # Delay in seconds } regenTestnetFiles rlpx -# regenTestnetFiles libp2p -d:withLibP2P +regenTestnetFiles libp2p_spec +regenTestnetFiles libp2p_native diff --git a/scripts/testnet0.env b/scripts/testnet0.env index ebc41138cd..a12a282433 100644 --- a/scripts/testnet0.env +++ b/scripts/testnet0.env @@ -1,4 +1,5 @@ -NETWORK_ID=1000001 +NETWORK_ID=10 +NETWORK_TYPE=rlpx SHARD_COUNT=16 SLOTS_PER_EPOCH=16 SECONDS_PER_SLOT=30 diff --git a/scripts/testnet1.env b/scripts/testnet1.env index 9d9f6154e9..b02dbf75f9 100644 --- a/scripts/testnet1.env +++ b/scripts/testnet1.env @@ -1,4 +1,5 @@ -NETWORK_ID=2000000 +NETWORK_ID=20 +NETWORK_TYPE=libp2p_spec SHARD_COUNT=16 SLOTS_PER_EPOCH=16 SECONDS_PER_SLOT=30 diff --git a/tests/test_peer_connection.nim b/tests/test_peer_connection.nim new file mode 100644 index 0000000000..86e10a88b4 --- /dev/null +++ b/tests/test_peer_connection.nim @@ -0,0 +1,40 @@ +import + unittest, os, + chronos, confutils, + ../beacon_chain/[conf, eth2_network] + +template asyncTest*(name, body: untyped) = + test name: + proc scenario {.async.} = body + waitFor scenario() + +asyncTest "connect two nodes": + let tempDir = getTempDir() / "peers_test" + + var c1 = BeaconNodeConf.defaults + c1.dataDir = OutDir(tempDir / "node-1") + c1.tcpPort = 50000 + c1.nat = "none" + + var n1PersistentAddress = c1.getPersistenBootstrapAddr( + parseIpAddress("127.0.0.1"), Port c1.tcpPort) + + var n1 = await createEth2Node(c1) + + echo "Node 1 persistent address: ", n1PersistentAddress + + when networkBackend != rlpxBackend: + var n1ActualAddress = await n1.daemon.identity() + echo "Node 1 actual address:", n1ActualAddress + + echo "Press any key to continue" + discard stdin.readLine() + + var c2 = BeaconNodeConf.defaults + c2.dataDir = OutDir(tempDir / "node-2") + c2.tcpPort = 50001 + c2.nat = "none" + var n2 = await createEth2Node(c2) + + await n2.connectToNetwork(bootstrapNodes = @[n1PersistentAddress]) +