Skip to content

Commit

Permalink
Restore compilation with libp2p_native after the latest changes in th…
Browse files Browse the repository at this point in the history
…e spec back-end
  • Loading branch information
zah committed Jun 24, 2019
1 parent 1bcd94a commit 84afb77
Showing 1 changed file with 54 additions and 18 deletions.
72 changes: 54 additions & 18 deletions beacon_chain/libp2p_backend.nim
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ type

PeerStateInitializer* = proc(peer: Peer): RootRef {.gcsafe.}
NetworkStateInitializer* = proc(network: EthereumNode): RootRef {.gcsafe.}
HandshakeStep* = proc(peer: Peer, handshakeStream: P2PStream): Future[void] {.gcsafe.}
HandshakeStep* = proc(peer: Peer, stream: P2PStream): Future[void] {.gcsafe.}
DisconnectionHandler* = proc(peer: Peer): Future[void] {.gcsafe.}
ThunkProc* = proc(daemon: DaemonAPI, stream: P2PStream): Future[void] {.gcsafe.}
MessageContentPrinter* = proc(msg: pointer): string {.gcsafe.}
Expand All @@ -96,8 +96,7 @@ proc init*(T: type Eth2Node, daemon: DaemonAPI): Future[T] {.async.} =
new result
result.daemon = daemon
result.daemon.userData = result
result.maxInactivityAllowed = 15.minutes # TODO: Read this from the config
init result.peers
result.peers = initTable[PeerID, Peer]()

newSeq result.protocolStates, allProtocols.len
for proto in allProtocols:
Expand Down Expand Up @@ -151,7 +150,7 @@ proc makeEth2Request(peer: Peer, protocolId: string, requestBytes: Bytes,
# Send the request
let stream = streamFut.read
let sent = await stream.transp.write(requestBytes)
if sent != requestBytes:
if sent != requestBytes.len:
await disconnectAndRaise(peer, FaultOrError, "Incomplete send")

# Read the response
Expand Down Expand Up @@ -222,6 +221,7 @@ proc init*(T: type Peer, network: Eth2Node, id: PeerID): Peer =
result.network = network
result.awaitedMessages = initTable[CompressedMsgId, FutureBase]()
result.connectionState = Connected
result.maxInactivityAllowed = 15.minutes # TODO: Read this from the config
newSeq result.protocolStates, allProtocols.len
for i in 0 ..< allProtocols.len:
let proto = allProtocols[i]
Expand All @@ -236,6 +236,9 @@ proc performProtocolHandshakes*(peer: Peer) {.async.} =

await all(subProtocolsHandshakes)

template initializeConnection*(peer: Peer): auto =
performProtocolHandshakes(peer)

template getRecipient(peer: Peer): Peer =
peer

Expand Down Expand Up @@ -357,7 +360,7 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
result.ResponderType = Responder

result.afterProtocolInit = proc (p: P2PProtocol) =
p.onPeerConnected.params.add newIdentDefs(ident"handshakeStream", P2PStream)
p.onPeerConnected.params.add newIdentDefs(streamVar, P2PStream)

result.implementMsg = proc (msg: Message) =
let
Expand All @@ -371,7 +374,11 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
msg.userHandler.params.insert(2, newIdentDefs(streamVar, P2PStream))
msg.initResponderCall.add streamVar

let awaitUserHandler = msg.genAwaitUserHandler(newCall("get", receivedMsg), [peerVar, streamVar])
##
## Implemenmt Thunk
##
let awaitUserHandler = msg.genAwaitUserHandler(
newCall("get", receivedMsg), [peerVar, streamVar])

let tracing = when tracingEnabled:
quote do: logReceivedMsg(`streamVar`.peer, `receivedMsg`.get)
Expand All @@ -382,17 +389,46 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
requestDataTimeout = newCall(milliseconds, newLit(defaultIncomingReqTimeout))
thunkName = ident(msgName & "_thunk")

msg.defineThunk quote do:
proc `thunkName`(`daemonVar`: `DaemonAPI`, `streamVar`: `P2PStream`) {.async, gcsafe.} =
var `deadlineVar` = sleepAsync `requestDataTimeout`
var `receivedMsg` = `await` readMsg(`streamVar`, `msgRecName`, `deadlineVar`)
if `receivedMsg`.isNone:
# TODO: This peer is misbehaving, perhaps we should penalize him somehow
return
let `peerVar` = `peerFromStream`(`daemonVar`, `streamVar`)
`tracing`
`awaitUserHandler`
`resolveNextMsgFutures`(`peerVar`, get(`receivedMsg`))
msg.defineThunk if msg.kind == msgHandshake:
# In LibP2P protocols, the `onPeerConnected` handler is executed when the
# other peer opens a stream. Contrary to other thunk procs, the message is
# not immediately deserialized. Instead, the handshake "sender proc" acts
# as an exchanger that sends our handshake message while deserializing the
# contents of the other peer's handshake.
# Thus, the very first communication act of the `onPeerConnected` handler
# must be the execution of the handshake exchanger.
let handshake = msg.protocol.onPeerConnected
if handshake.isNil:
macros.error "A LibP2P protocol with a handshake must also include an " &
"`onPeerConnected` handler.", msg.procDef

# We must generate a forward declaration for the `onPeerConnected` handler,
# so we can call it from the thunk proc:
let handshakeProcName = handshake.name
msg.protocol.outRecvProcs.add quote do:
proc `handshakeProcName`(`peerVar`: `Peer`,
`streamVar`: `P2PStream`) {.async, gcsafe.}

quote:
proc `thunkName`(`daemonVar`: `DaemonAPI`,
`streamVar`: `P2PStream`): Future[void] {.gcsafe.} =
let `peerVar` = `peerFromStream`(`daemonVar`, `streamVar`)
return `handshakeProcName`(`peerVar`, `streamVar`)
else:
quote:
proc `thunkName`(`daemonVar`: `DaemonAPI`,
`streamVar`: `P2PStream`) {.async, gcsafe.} =
var `deadlineVar` = sleepAsync `requestDataTimeout`
var `receivedMsg` = `await` readMsg(`streamVar`,
`msgRecName`,
`deadlineVar`)
if `receivedMsg`.isNone:
# TODO: This peer is misbehaving, perhaps we should penalize him somehow
return
let `peerVar` = `peerFromStream`(`daemonVar`, `streamVar`)
`tracing`
`awaitUserHandler`
`resolveNextMsgFutures`(`peerVar`, get(`receivedMsg`))

##
## Implement Senders and Handshake
Expand All @@ -410,7 +446,7 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =

handshakeExchanger.setBody quote do:
let
stream = ident"handshakeStream"
stream = ident "stream"
rawSendProc = `bindSymOp` `rawSendProc`
params = `paramsArray`
lazySendCall = newCall(rawSendProc, params)
Expand Down

0 comments on commit 84afb77

Please sign in to comment.