This repository has been archived by the owner on Jul 1, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 146
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Implement UDP communication services for discv5
- Loading branch information
1 parent
33c4894
commit 904084b
Showing
3 changed files
with
130 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,65 @@ | ||
import logging | ||
from typing import ( | ||
NamedTuple, | ||
) | ||
|
||
from trio.socket import ( | ||
SocketType, | ||
) | ||
from trio.abc import ( | ||
ReceiveChannel, | ||
SendChannel, | ||
) | ||
|
||
from p2p.trio_service import ( | ||
as_service, | ||
Manager, | ||
) | ||
|
||
from p2p.discv5.constants import ( | ||
DATAGRAM_BUFFER_SIZE, | ||
) | ||
|
||
|
||
class Endpoint(NamedTuple): | ||
ip_address: bytes | ||
port: int | ||
|
||
|
||
class IncomingDatagram(NamedTuple): | ||
datagram: bytes | ||
sender: Endpoint | ||
|
||
|
||
class OutgoingDatagram(NamedTuple): | ||
datagram: bytes | ||
receiver: Endpoint | ||
|
||
|
||
@as_service | ||
async def DatagramReceiver(manager: Manager, | ||
socket: SocketType, | ||
incoming_datagram_send_channel: SendChannel[IncomingDatagram], | ||
) -> None: | ||
"""Read datagrams from a socket and send them to a channel.""" | ||
logger = logging.getLogger('p2p.discv5.channel_services.DatagramReceiver') | ||
|
||
async with incoming_datagram_send_channel: | ||
while True: | ||
datagram, sender = await socket.recvfrom(DATAGRAM_BUFFER_SIZE) | ||
logger.debug(f"Received {len(datagram)} bytes from {sender}") | ||
await incoming_datagram_send_channel.send(IncomingDatagram(datagram, sender)) | ||
|
||
|
||
@as_service | ||
async def DatagramSender(manager: Manager, | ||
outgoing_datagram_receive_channel: ReceiveChannel[OutgoingDatagram], | ||
socket: SocketType, | ||
) -> None: | ||
"""Take datagrams from a channel and send them via a socket to their designated receivers.""" | ||
logger = logging.getLogger('p2p.discv5.channel_services.DatagramSender') | ||
|
||
async with outgoing_datagram_receive_channel: | ||
async for datagram, receiver in outgoing_datagram_receive_channel: | ||
logger.debug(f"Sending {len(datagram)} bytes to {receiver}") | ||
await socket.sendto(datagram, receiver) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,62 @@ | ||
import trio | ||
|
||
import pytest | ||
|
||
from p2p.trio_service import ( | ||
background_service, | ||
) | ||
|
||
from p2p.discv5.channel_services import ( | ||
DatagramReceiver, | ||
DatagramSender, | ||
OutgoingDatagram, | ||
) | ||
|
||
|
||
@pytest.fixture | ||
async def socket_pair(): | ||
sending_socket = trio.socket.socket( | ||
family=trio.socket.AF_INET, | ||
type=trio.socket.SOCK_DGRAM, | ||
) | ||
receiving_socket = trio.socket.socket( | ||
family=trio.socket.AF_INET, | ||
type=trio.socket.SOCK_DGRAM, | ||
) | ||
# specifying 0 as port number results in using random available port | ||
await sending_socket.bind(("127.0.0.1", 0)) | ||
await receiving_socket.bind(("127.0.0.1", 0)) | ||
return sending_socket, receiving_socket | ||
|
||
|
||
async def test_datagram_receiver(socket_pair): | ||
sending_socket, receiving_socket = socket_pair | ||
receiver_address = receiving_socket.getsockname() | ||
sender_address = sending_socket.getsockname() | ||
|
||
send_channel, receive_channel = trio.open_memory_channel(1) | ||
async with background_service(DatagramReceiver(receiving_socket, send_channel)): | ||
data = b"some packet" | ||
|
||
await sending_socket.sendto(data, receiver_address) | ||
with trio.fail_after(0.5): | ||
received_datagram = await receive_channel.receive() | ||
|
||
assert received_datagram.datagram == data | ||
assert received_datagram.sender == sender_address | ||
|
||
|
||
async def test_datagram_sender(socket_pair): | ||
sending_socket, receiving_socket = socket_pair | ||
receiver_address = receiving_socket.getsockname() | ||
sender_address = sending_socket.getsockname() | ||
|
||
send_channel, receive_channel = trio.open_memory_channel(1) | ||
async with background_service(DatagramSender(receive_channel, sending_socket)): | ||
outgoing_datagram = OutgoingDatagram(b"some packet", receiver_address) | ||
await send_channel.send(outgoing_datagram) | ||
|
||
with trio.fail_after(0.5): | ||
data, sender = await receiving_socket.recvfrom(1024) | ||
assert data == outgoing_datagram.datagram | ||
assert sender == sender_address |