Skip to content

Commit

Permalink
Merge pull request #3625 from lbryio/dht_crawler
Browse files Browse the repository at this point in the history
Add script to collect DHT metrics
  • Loading branch information
shyba authored Sep 7, 2022
2 parents a334a93 + c7c2d6f commit f745560
Show file tree
Hide file tree
Showing 6 changed files with 591 additions and 14 deletions.
5 changes: 5 additions & 0 deletions docker/Dockerfile.dht_node
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ FROM debian:10-slim

ARG user=lbry
ARG projects_dir=/home/$user
ARG db_dir=/database

ARG DOCKER_TAG
ARG DOCKER_COMMIT=docker
Expand All @@ -27,12 +28,16 @@ RUN groupadd -g 999 $user && useradd -m -u 999 -g $user $user

COPY . $projects_dir
RUN chown -R $user:$user $projects_dir
RUN mkdir -p $db_dir
RUN chown -R $user:$user $db_dir

USER $user
WORKDIR $projects_dir

RUN python3 -m pip install -U setuptools pip
RUN make install
RUN python3 docker/set_build.py
RUN rm ~/.cache -rf
VOLUME $db_dir
ENTRYPOINT ["python3", "scripts/dht_node.py"]

10 changes: 7 additions & 3 deletions lbry/dht/peer.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ def update_contact_triple(self, node_id: bytes, address: str, udp_port: int):
self._node_id_reverse_mapping[node_id] = (address, udp_port)
self.peer_manager_keys_metric.labels("global").set(self.count_cache_keys())

def get_node_id_for_endpoint(self, address, port):
return self._node_id_mapping.get((address, port))

def prune(self): # TODO: periodically call this
now = self._loop.time()
to_pop = []
Expand Down Expand Up @@ -150,9 +153,10 @@ def contact_triple_is_good(self, node_id: bytes, address: str, udp_port: int):
def peer_is_good(self, peer: 'KademliaPeer'):
return self.contact_triple_is_good(peer.node_id, peer.address, peer.udp_port)

def decode_tcp_peer_from_compact_address(self, compact_address: bytes) -> 'KademliaPeer': # pylint: disable=no-self-use
node_id, address, tcp_port = decode_compact_address(compact_address)
return make_kademlia_peer(node_id, address, udp_port=None, tcp_port=tcp_port)

def decode_tcp_peer_from_compact_address(compact_address: bytes) -> 'KademliaPeer': # pylint: disable=no-self-use
node_id, address, tcp_port = decode_compact_address(compact_address)
return make_kademlia_peer(node_id, address, udp_port=None, tcp_port=tcp_port)


@dataclass(unsafe_hash=True)
Expand Down
24 changes: 14 additions & 10 deletions lbry/dht/protocol/iterative_find.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from lbry.dht import constants
from lbry.dht.error import RemoteException, TransportNotConnected
from lbry.dht.protocol.distance import Distance
from lbry.dht.peer import make_kademlia_peer
from lbry.dht.peer import make_kademlia_peer, decode_tcp_peer_from_compact_address
from lbry.dht.serialization.datagram import PAGE_KEY

if TYPE_CHECKING:
Expand All @@ -26,6 +26,15 @@ def found(self) -> bool:
def get_close_triples(self) -> typing.List[typing.Tuple[bytes, str, int]]:
raise NotImplementedError()

def get_close_kademlia_peers(self, peer_info) -> typing.Generator[typing.Iterator['KademliaPeer'], None, None]:
for contact_triple in self.get_close_triples():
node_id, address, udp_port = contact_triple
try:
yield make_kademlia_peer(node_id, address, udp_port)
except ValueError:
log.warning("misbehaving peer %s:%i returned peer with reserved ip %s:%i", peer_info.address,
peer_info.udp_port, address, udp_port)


class FindNodeResponse(FindResponse):
def __init__(self, key: bytes, close_triples: typing.List[typing.Tuple[bytes, str, int]]):
Expand Down Expand Up @@ -125,13 +134,8 @@ def _add_active(self, peer, force=False):

async def _handle_probe_result(self, peer: 'KademliaPeer', response: FindResponse):
self._add_active(peer)
for contact_triple in response.get_close_triples():
node_id, address, udp_port = contact_triple
try:
self._add_active(make_kademlia_peer(node_id, address, udp_port))
except ValueError:
log.warning("misbehaving peer %s:%i returned peer with reserved ip %s:%i", peer.address,
peer.udp_port, address, udp_port)
for new_peer in response.get_close_kademlia_peers(peer):
self._add_active(new_peer)
self.check_result_ready(response)
self._log_state(reason="check result")

Expand Down Expand Up @@ -319,7 +323,7 @@ async def send_probe(self, peer: 'KademliaPeer') -> FindValueResponse:
decoded_peers = set()
for compact_addr in parsed.found_compact_addresses:
try:
decoded_peers.add(self.peer_manager.decode_tcp_peer_from_compact_address(compact_addr))
decoded_peers.add(decode_tcp_peer_from_compact_address(compact_addr))
except ValueError:
log.warning("misbehaving peer %s:%i returned invalid peer for blob",
peer.address, peer.udp_port)
Expand All @@ -341,7 +345,7 @@ async def send_probe(self, peer: 'KademliaPeer') -> FindValueResponse:

def check_result_ready(self, response: FindValueResponse):
if response.found:
blob_peers = [self.peer_manager.decode_tcp_peer_from_compact_address(compact_addr)
blob_peers = [decode_tcp_peer_from_compact_address(compact_addr)
for compact_addr in response.found_compact_addresses]
to_yield = []
for blob_peer in blob_peers:
Expand Down
2 changes: 1 addition & 1 deletion lbry/extras/daemon/client.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from lbry.conf import Config
from lbry.extras.cli import execute_command
from lbry.conf import Config


def daemon_rpc(conf: Config, method: str, **kwargs):
Expand Down
Loading

0 comments on commit f745560

Please sign in to comment.