From 91d360a0f7dff1c3bf005010cb6d7bb6f19d890f Mon Sep 17 00:00:00 2001 From: Rowdy Mitchell Chotkan Date: Wed, 31 May 2023 11:35:52 +0200 Subject: [PATCH 1/9] Upgrade PyQt, Yarl, and LibTorrent dependencies --- requirements-core.txt | 4 ++-- requirements.txt | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/requirements-core.txt b/requirements-core.txt index f5a734f0ca8..3e52df621f6 100644 --- a/requirements-core.txt +++ b/requirements-core.txt @@ -20,10 +20,10 @@ pyyaml==6.0 sentry-sdk==1.14.0 service-identity==21.1.0 yappi==1.4.0 -yarl==1.7.2 # keep this dependency higher than 1.6.3. See: https://github.com/aio-libs/yarl/issues/517 +yarl==1.8.2 # keep this dependency higher than 1.6.3. See: https://github.com/aio-libs/yarl/issues/517 bitarray==2.5.1 pyipv8==2.10.0 -libtorrent==1.2.15 +libtorrent==1.2.19 file-read-backwards==2.0.0 Brotli==1.0.9 # to prevent AttributeError on macOs: module 'brotli' has no attribute 'error' (in urllib3.response) human-readable==1.3.2 diff --git a/requirements.txt b/requirements.txt index 0085760f29e..c3f311c6ba2 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,6 +2,6 @@ Pillow==9.3.0 PyQt5==5.15.1 -PyQt5-sip==12.8.1 +PyQt5-sip==12.11.1 pyqtgraph==0.12.3 PyQtWebEngine==5.15.2 From 066a02bd005dba6f1f109a62342bf1c127dfc610 Mon Sep 17 00:00:00 2001 From: Rowdy Mitchell Chotkan Date: Thu, 22 Jun 2023 12:42:38 +0200 Subject: [PATCH 2/9] Add initial rendezvous design --- .../community/popularity_community.py | 101 +++++++++++++++++- .../popularity/popularity_component.py | 5 + .../popularity/rendezvous/__init__.py | 0 .../popularity/rendezvous/db/__init__.py | 0 .../popularity/rendezvous/db/database.py | 33 ++++++ .../rendezvous/db/orm_bindings/__init__.py | 0 .../rendezvous/db/orm_bindings/certificate.py | 14 +++ .../rendezvous/db/orm_bindings/misc.py | 9 ++ .../popularity/rendezvous/rendezvous.py | 56 ++++++++++ .../popularity/rendezvous/rendezvous_cache.py | 23 ++++ 10 files changed, 239 insertions(+), 2 deletions(-) create mode 100644 src/tribler/core/components/popularity/rendezvous/__init__.py create mode 100644 src/tribler/core/components/popularity/rendezvous/db/__init__.py create mode 100644 src/tribler/core/components/popularity/rendezvous/db/database.py create mode 100644 src/tribler/core/components/popularity/rendezvous/db/orm_bindings/__init__.py create mode 100644 src/tribler/core/components/popularity/rendezvous/db/orm_bindings/certificate.py create mode 100644 src/tribler/core/components/popularity/rendezvous/db/orm_bindings/misc.py create mode 100644 src/tribler/core/components/popularity/rendezvous/rendezvous.py create mode 100644 src/tribler/core/components/popularity/rendezvous/rendezvous_cache.py diff --git a/src/tribler/core/components/popularity/community/popularity_community.py b/src/tribler/core/components/popularity/community/popularity_community.py index aad18d6741f..da5aed52af3 100644 --- a/src/tribler/core/components/popularity/community/popularity_community.py +++ b/src/tribler/core/components/popularity/community/popularity_community.py @@ -1,15 +1,24 @@ from __future__ import annotations import random +import secrets from binascii import unhexlify from typing import List, TYPE_CHECKING +from cryptography.exceptions import InvalidSignature from ipv8.lazy_community import lazy_wrapper +from ipv8.messaging.interfaces.udp.endpoint import UDPv4Address, UDPv4LANAddress +from ipv8.messaging.serialization import PackError from pony.orm import db_session from tribler.core.components.metadata_store.remote_query_community.remote_query_community import RemoteQueryCommunity from tribler.core.components.popularity.community.payload import PopularTorrentsRequest, TorrentsHealthPayload from tribler.core.components.popularity.community.version_community_mixin import VersionCommunityMixin +from tribler.core.components.popularity.rendezvous.rendezvous import RendezvousRequestPayload, \ + RendezvousResponsePayload, RawRendezvousResponsePayload, \ + RendezvousChallenge, RendezvousSignature +from tribler.core.components.popularity.rendezvous.db.database import RendezvousDatabase +from tribler.core.components.popularity.rendezvous.rendezvous_cache import RendezvousCache from tribler.core.components.torrent_checker.torrent_checker.dataclasses import HealthInfo from tribler.core.utilities.pony_utils import run_threaded from tribler.core.utilities.unicode import hexlify @@ -35,11 +44,16 @@ class PopularityCommunity(RemoteQueryCommunity, VersionCommunityMixin): GOSSIP_POPULAR_TORRENT_COUNT = 10 GOSSIP_RANDOM_TORRENT_COUNT = 10 - community_id = unhexlify('9aca62f878969c437da9844cba29a134917e1648') + PING_INTERVAL_RENDEZVOUS = 60 # seconds + DB_NAME = 'rendezvous.db' - def __init__(self, *args, torrent_checker=None, **kwargs): + community_id = unhexlify('9aca62f878969c437da9844cba29a134917e1649') + + def __init__(self, *args, torrent_checker=None, rendezvous_db=None, **kwargs): # Creating a separate instance of Network for this community to find more peers super().__init__(*args, **kwargs) + + self.rdb: RendezvousDatabase = rendezvous_db self.torrent_checker: TorrentChecker = torrent_checker self.add_message_handler(TorrentsHealthPayload, self.on_torrents_health) @@ -51,6 +65,56 @@ def __init__(self, *args, torrent_checker=None, **kwargs): # Init version community message handlers self.init_version_community() + self.rendezvous_cache = RendezvousCache() + + def send_introduction_request(self, peer): + rendezvous = RendezvousChallenge(secrets.token_bytes(16)) + extra_payload = self.serializer.pack_serializable(RendezvousRequestPayload(rendezvous)) + self.logger.debug("Piggy-backing Rendezvous to %s:%d", peer.address[0], peer.address[1]) + packet = self.create_introduction_request(peer.address, extra_bytes=extra_payload, + new_style=peer.new_style_intro) + self.endpoint.send(peer.address, packet) + self.rendezvous_cache[peer.mid] = rendezvous.nonce + + # We override this method to add the rendezvous certificate to the introduction request + def on_introduction_request(self, peer, dist, payload): + if 0 <= self.max_peers < len(self.get_peers()): + self.logger.debug("Dropping introduction request from (%s, %d): too many peers!", + peer.address[0], peer.address[1]) + return + + extra_payload = b'' + if payload.extra_bytes: + self.logger.debug("Received introduction request with extra bytes") + try: + rendezvous_request, _ = self.serializer.unpack_serializable(RendezvousRequestPayload, + payload.extra_bytes) + extra_payload = self._handle_rendezvous_request(rendezvous_request) + + except PackError as e: + self.logger.warning("Failed to unpack RendezvousRequestPayload: %s", e) + + if isinstance(payload.source_lan_address, UDPv4Address): + peer.address = UDPv4LANAddress(*payload.source_lan_address) + self.network.add_verified_peer(peer) + self.network.discover_services(peer, [self.community_id, ]) + + packet = self.create_introduction_response(payload.destination_address, peer.address, payload.identifier, + extra_bytes=extra_payload, new_style=peer.new_style_intro) + + self.endpoint.send(peer.address, packet) + self.introduction_request_callback(peer, dist, payload) + + def introduction_response_callback(self, peer, dist, payload): + super().introduction_response_callback(peer, dist, payload) + if payload.extra_bytes: + self.logger.debug("Received introduction response with extra bytes") + try: + raw_rendezvous_response, _ = self.serializer.unpack_serializable(RawRendezvousResponsePayload, + payload.extra_bytes) + self._handle_rendezvous_response(peer, raw_rendezvous_response) + except PackError as e: + self.logger.warning("Failed to unpack RendezvousResponsePayload: %s", e) def introduction_request_callback(self, peer, dist, payload): super().introduction_request_callback(peer, dist, payload) @@ -140,3 +204,36 @@ def get_random_torrents(self) -> List[HealthInfo]: random_torrents = random.sample(checked_and_alive, num_torrents_to_send) return random_torrents + + def _handle_rendezvous_request(self, payload: RendezvousRequestPayload) -> bytes: + if not payload: + self.logger.warning("Received invalid rendezvous request") + return b'' + + signature = payload.challenge.sign(self.my_peer.key) + payload = RendezvousResponsePayload(payload.challenge, RendezvousSignature(signature)) + if not self.crypto.is_valid_signature(self.my_peer.public_key, + self.serializer.pack_serializable(payload.challenge), + signature): + self.logger.warning("Received rendezvous response with invalid signature") + + return self.serializer.pack_serializable(payload) + + def _handle_rendezvous_response(self, peer, raw_payload: RawRendezvousResponsePayload): + signature, _ = self.serializer.unpack_serializable(RendezvousSignature, raw_payload.signature) + challenge, _ = self.serializer.unpack_serializable(RendezvousChallenge, raw_payload.challenge) + + if not self.rendezvous_cache[peer.mid] == challenge.nonce: + self.logger.warning(f"Received invalid rendezvous response from {peer.mid}") + return + + if not self.crypto.is_valid_signature(peer.key, raw_payload.challenge, signature.signature): + self.logger.warning(f"Received invalid signature from {peer.mid}") + return + + self.logger.debug(f"Received valid rendezvous response from {peer.mid}") + with db_session: + certificate = self.rdb.Certificate.get(public_key=peer.mid) + if not certificate: + certificate = self.rdb.Certificate(public_key=peer.mid, counter=0) + certificate.counter += 1 diff --git a/src/tribler/core/components/popularity/popularity_component.py b/src/tribler/core/components/popularity/popularity_component.py index bbc54a5ac86..9cf44d3a670 100644 --- a/src/tribler/core/components/popularity/popularity_component.py +++ b/src/tribler/core/components/popularity/popularity_component.py @@ -5,8 +5,10 @@ from tribler.core.components.ipv8.ipv8_component import INFINITE, Ipv8Component from tribler.core.components.metadata_store.metadata_store_component import MetadataStoreComponent from tribler.core.components.popularity.community.popularity_community import PopularityCommunity +from tribler.core.components.popularity.rendezvous.db.database import RendezvousDatabase from tribler.core.components.reporter.reporter_component import ReporterComponent from tribler.core.components.torrent_checker.torrent_checker_component import TorrentCheckerComponent +from tribler.core.utilities.simpledefs import STATEDIR_DB_DIR class PopularityComponent(Component): @@ -22,6 +24,8 @@ async def run(self): metadata_store_component = await self.require_component(MetadataStoreComponent) torrent_checker_component = await self.require_component(TorrentCheckerComponent) + rendezvous_db = RendezvousDatabase(db_path=self.session.config.state_dir / STATEDIR_DB_DIR / "rendezvous.db") + config = self.session.config community = PopularityCommunity(self._ipv8_component.peer, self._ipv8_component.ipv8.endpoint, @@ -29,6 +33,7 @@ async def run(self): settings=config.popularity_community, rqc_settings=config.remote_query_community, metadata_store=metadata_store_component.mds, + rendezvous_db=rendezvous_db, torrent_checker=torrent_checker_component.torrent_checker) self.community = community diff --git a/src/tribler/core/components/popularity/rendezvous/__init__.py b/src/tribler/core/components/popularity/rendezvous/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/src/tribler/core/components/popularity/rendezvous/db/__init__.py b/src/tribler/core/components/popularity/rendezvous/db/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/src/tribler/core/components/popularity/rendezvous/db/database.py b/src/tribler/core/components/popularity/rendezvous/db/database.py new file mode 100644 index 00000000000..6005993f8bc --- /dev/null +++ b/src/tribler/core/components/popularity/rendezvous/db/database.py @@ -0,0 +1,33 @@ +from pathlib import Path +from typing import Union + +from pony.orm import Database, db_session + +from tribler.core.components.metadata_store.db.orm_bindings import misc +from tribler.core.components.popularity.rendezvous.db.orm_bindings import certificate +from tribler.core.utilities.utilities import MEMORY_DB + + +class RendezvousDatabase: + DB_VERSION = 0 + + def __init__(self, db_path: Union[Path, type(MEMORY_DB)]): + + self.database = Database() + + self.MiscData = misc.define_binding(self.database) + self.Certificate = certificate.define_binding(self.database) + + if db_path is MEMORY_DB: + create_db = True + db_path_string = ":memory:" + else: + create_db = not db_path.is_file() + db_path_string = str(db_path) + + self.database.bind(provider='sqlite', filename=db_path_string, create_db=create_db, timeout=120.0) + self.database.generate_mapping(create_tables=create_db) + + if create_db: + with db_session: + self.MiscData(name="db_version", value=str(self.DB_VERSION)) diff --git a/src/tribler/core/components/popularity/rendezvous/db/orm_bindings/__init__.py b/src/tribler/core/components/popularity/rendezvous/db/orm_bindings/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/src/tribler/core/components/popularity/rendezvous/db/orm_bindings/certificate.py b/src/tribler/core/components/popularity/rendezvous/db/orm_bindings/certificate.py new file mode 100644 index 00000000000..ca4323c457f --- /dev/null +++ b/src/tribler/core/components/popularity/rendezvous/db/orm_bindings/certificate.py @@ -0,0 +1,14 @@ +from pony.orm import Required, db_session + + +def define_binding(db): + class RendezvousCertificate(db.Entity): + public_key = Required(bytes, index=True) + counter = Required(int) + + @staticmethod + @db_session + def get_count(pk: bytes) -> int: + return RendezvousCertificate.get(public_key == pk).count() + + return RendezvousCertificate diff --git a/src/tribler/core/components/popularity/rendezvous/db/orm_bindings/misc.py b/src/tribler/core/components/popularity/rendezvous/db/orm_bindings/misc.py new file mode 100644 index 00000000000..e5434d09d1b --- /dev/null +++ b/src/tribler/core/components/popularity/rendezvous/db/orm_bindings/misc.py @@ -0,0 +1,9 @@ +from pony.orm import Optional, PrimaryKey + + +def define_binding(db): + class MiscData(db.Entity): + name = PrimaryKey(str) + value = Optional(str) + + return MiscData diff --git a/src/tribler/core/components/popularity/rendezvous/rendezvous.py b/src/tribler/core/components/popularity/rendezvous/rendezvous.py new file mode 100644 index 00000000000..6a7929ddfa5 --- /dev/null +++ b/src/tribler/core/components/popularity/rendezvous/rendezvous.py @@ -0,0 +1,56 @@ +import secrets +from dataclasses import dataclass + +from ipv8.keyvault.crypto import default_eccrypto +from ipv8.messaging.payload_dataclass import overwrite_dataclass, type_from_format +from ipv8.messaging.serialization import default_serializer + +dataclass = overwrite_dataclass(dataclass) + + +@dataclass +class RendezvousChallenge: + nonce: bytes + + def __str__(self): + return f"RendezvousRequest(public_key_b={self.nonce})" + + def sign(self, sk, crypto=default_eccrypto) -> bytes: + serialized = default_serializer.pack_serializable(self) + return crypto.create_signature(sk, serialized) + + +@dataclass +class RendezvousSignature: + signature: type_from_format('64s') + + def __str__(self): + return f"RendezvousSignature(signature={self.signature})" + + +@dataclass(msg_id=3) +class RendezvousRequestPayload: + challenge: RendezvousChallenge + + def __str__(self): + return f"RendezvousCertificateRequestPayload(certificate={self.challenge})" + + +@dataclass(msg_id=4) +class RawRendezvousResponsePayload: + challenge: type_from_format('varlenH') + signature: type_from_format('varlenH') + + def __str__(self): + return f"RendezvousCertificatePayload(rendezvous_certificate={self.challenge}, " \ + f"signature={self.signature})" + + +@dataclass(msg_id=4) +class RendezvousResponsePayload: + challenge: RendezvousChallenge + signature: RendezvousSignature + + def __str__(self): + return f"RendezvousCertificatePayload(rendezvous_certificate={self.challenge}, " \ + f"signature={self.signature})" diff --git a/src/tribler/core/components/popularity/rendezvous/rendezvous_cache.py b/src/tribler/core/components/popularity/rendezvous/rendezvous_cache.py new file mode 100644 index 00000000000..cdabefd5490 --- /dev/null +++ b/src/tribler/core/components/popularity/rendezvous/rendezvous_cache.py @@ -0,0 +1,23 @@ +import threading +from collections import defaultdict + + +class RendezvousCache: + + def __init__(self): + self._cache = defaultdict(bytes) + self._rendezvous_lock = threading.Lock() + + def __getitem__(self, item): + with self._rendezvous_lock: + return self._cache[item] + + def __setitem__(self, key, value): + with self._rendezvous_lock: + self._cache[key] = value + + def get_rendezvous_challenge(self, peer_mid): + return self.__getitem__(peer_mid) + + def set_rendezvous_challenge(self, peer_mid, challenge): + return self.__setitem__(peer_mid, challenge) From 31dbcc89e014b75948d37ba1c3b04ffbe7af1dae Mon Sep 17 00:00:00 2001 From: Rowdy Mitchell Chotkan Date: Tue, 27 Jun 2023 15:06:13 +0200 Subject: [PATCH 3/9] Finalize rendezvous design --- .../community/popularity_community.py | 73 +++++++++++++------ .../rendezvous/db/orm_bindings/certificate.py | 4 +- .../popularity/rendezvous/rendezvous.py | 7 +- .../popularity/rendezvous/rendezvous_cache.py | 37 +++++++--- 4 files changed, 86 insertions(+), 35 deletions(-) diff --git a/src/tribler/core/components/popularity/community/popularity_community.py b/src/tribler/core/components/popularity/community/popularity_community.py index da5aed52af3..ffac225333d 100644 --- a/src/tribler/core/components/popularity/community/popularity_community.py +++ b/src/tribler/core/components/popularity/community/popularity_community.py @@ -1,11 +1,9 @@ from __future__ import annotations import random -import secrets from binascii import unhexlify from typing import List, TYPE_CHECKING -from cryptography.exceptions import InvalidSignature from ipv8.lazy_community import lazy_wrapper from ipv8.messaging.interfaces.udp.endpoint import UDPv4Address, UDPv4LANAddress from ipv8.messaging.serialization import PackError @@ -14,11 +12,11 @@ from tribler.core.components.metadata_store.remote_query_community.remote_query_community import RemoteQueryCommunity from tribler.core.components.popularity.community.payload import PopularTorrentsRequest, TorrentsHealthPayload from tribler.core.components.popularity.community.version_community_mixin import VersionCommunityMixin +from tribler.core.components.popularity.rendezvous.db.database import RendezvousDatabase from tribler.core.components.popularity.rendezvous.rendezvous import RendezvousRequestPayload, \ RendezvousResponsePayload, RawRendezvousResponsePayload, \ RendezvousChallenge, RendezvousSignature -from tribler.core.components.popularity.rendezvous.db.database import RendezvousDatabase -from tribler.core.components.popularity.rendezvous.rendezvous_cache import RendezvousCache +from tribler.core.components.popularity.rendezvous.rendezvous_cache import RendezvousCache, EMPTY_PEER_CHALLENGE from tribler.core.components.torrent_checker.torrent_checker.dataclasses import HealthInfo from tribler.core.utilities.pony_utils import run_threaded from tribler.core.utilities.unicode import hexlify @@ -59,22 +57,27 @@ def __init__(self, *args, torrent_checker=None, rendezvous_db=None, **kwargs): self.add_message_handler(TorrentsHealthPayload, self.on_torrents_health) self.add_message_handler(PopularTorrentsRequest, self.on_popular_torrents_request) + self.add_message_handler(RendezvousRequestPayload, self.on_rendezvous_request) + self.add_message_handler(RendezvousResponsePayload, self.on_rendezvous_response) + self.logger.info('Popularity Community initialized (peer mid %s)', hexlify(self.my_peer.mid)) self.register_task("gossip_random_torrents", self.gossip_random_torrents_health, interval=PopularityCommunity.GOSSIP_INTERVAL_FOR_RANDOM_TORRENTS) + self.register_task("ping_rendezvous", self.ping_rendezvous, + interval=PopularityCommunity.PING_INTERVAL_RENDEZVOUS) # Init version community message handlers self.init_version_community() self.rendezvous_cache = RendezvousCache() def send_introduction_request(self, peer): - rendezvous = RendezvousChallenge(secrets.token_bytes(16)) - extra_payload = self.serializer.pack_serializable(RendezvousRequestPayload(rendezvous)) + rendezvous_request = self._create_rendezvous_request() + extra_payload = self.serializer.pack_serializable(rendezvous_request) self.logger.debug("Piggy-backing Rendezvous to %s:%d", peer.address[0], peer.address[1]) packet = self.create_introduction_request(peer.address, extra_bytes=extra_payload, new_style=peer.new_style_intro) self.endpoint.send(peer.address, packet) - self.rendezvous_cache[peer.mid] = rendezvous.nonce + self.rendezvous_cache.add_peer(peer, rendezvous_request.challenge.nonce) # We override this method to add the rendezvous certificate to the introduction request def on_introduction_request(self, peer, dist, payload): @@ -89,8 +92,10 @@ def on_introduction_request(self, peer, dist, payload): try: rendezvous_request, _ = self.serializer.unpack_serializable(RendezvousRequestPayload, payload.extra_bytes) - extra_payload = self._handle_rendezvous_request(rendezvous_request) - + rendezvous_response = self._create_rendezvous_response(rendezvous_request.challenge) + # As we are sending the rendezvous response, we know this peer is interested in rendezvous. + self.rendezvous_cache.add_peer(peer) + extra_payload = self.serializer.pack_serializable(rendezvous_response) except PackError as e: self.logger.warning("Failed to unpack RendezvousRequestPayload: %s", e) @@ -105,6 +110,19 @@ def on_introduction_request(self, peer, dist, payload): self.endpoint.send(peer.address, packet) self.introduction_request_callback(peer, dist, payload) + @lazy_wrapper(RendezvousRequestPayload) + def on_rendezvous_request(self, peer, payload: RendezvousRequestPayload): + self.logger.debug("Received rendezvous request from %s:%d", peer.address[0], peer.address[1]) + # As we are sending the rendezvous response, we know this peer is interested in rendezvous. + self.rendezvous_cache.add_peer(peer) + rendezvous_response = self._create_rendezvous_response(payload.challenge) + self.ez_send(peer, rendezvous_response) + + @lazy_wrapper(RawRendezvousResponsePayload) + def on_rendezvous_response(self, peer, payload: RawRendezvousResponsePayload): + self.logger.debug("Received rendezvous response from %s:%d", peer.address[0], peer.address[1]) + self._handle_rendezvous_response(peer, payload) + def introduction_response_callback(self, peer, dist, payload): super().introduction_response_callback(peer, dist, payload) if payload.extra_bytes: @@ -113,6 +131,7 @@ def introduction_response_callback(self, peer, dist, payload): raw_rendezvous_response, _ = self.serializer.unpack_serializable(RawRendezvousResponsePayload, payload.extra_bytes) self._handle_rendezvous_response(peer, raw_rendezvous_response) + except PackError as e: self.logger.warning("Failed to unpack RendezvousResponsePayload: %s", e) @@ -140,6 +159,15 @@ def gossip_random_torrents_health(self): self.ez_send(random_peer, TorrentsHealthPayload.create(random_torrents, {})) + def ping_rendezvous(self): + # Remove peers that haven't replied in a while. + self.rendezvous_cache.clear_inactive_peers() + + for peer in self.rendezvous_cache.get_rendezvous_peers(): + payload = self._create_rendezvous_request() + self.rendezvous_cache.set_rendezvous_challenge(peer, payload.challenge.nonce) + self.ez_send(peer, payload) + @lazy_wrapper(TorrentsHealthPayload) async def on_torrents_health(self, peer, payload): self.logger.debug(f"Received torrent health information for " @@ -205,31 +233,31 @@ def get_random_torrents(self) -> List[HealthInfo]: random_torrents = random.sample(checked_and_alive, num_torrents_to_send) return random_torrents - def _handle_rendezvous_request(self, payload: RendezvousRequestPayload) -> bytes: - if not payload: - self.logger.warning("Received invalid rendezvous request") - return b'' - - signature = payload.challenge.sign(self.my_peer.key) - payload = RendezvousResponsePayload(payload.challenge, RendezvousSignature(signature)) - if not self.crypto.is_valid_signature(self.my_peer.public_key, - self.serializer.pack_serializable(payload.challenge), - signature): - self.logger.warning("Received rendezvous response with invalid signature") + def _create_rendezvous_request(self) -> RendezvousRequestPayload: + challenge = RendezvousChallenge.create() + payload = RendezvousRequestPayload(challenge) + return payload - return self.serializer.pack_serializable(payload) + def _create_rendezvous_response(self, challenge: RendezvousChallenge) -> RendezvousResponsePayload: + signature = challenge.sign(self.my_peer.key) + payload = RendezvousResponsePayload(challenge, RendezvousSignature(signature)) + return payload def _handle_rendezvous_response(self, peer, raw_payload: RawRendezvousResponsePayload): signature, _ = self.serializer.unpack_serializable(RendezvousSignature, raw_payload.signature) challenge, _ = self.serializer.unpack_serializable(RendezvousChallenge, raw_payload.challenge) - if not self.rendezvous_cache[peer.mid] == challenge.nonce: + expected_nonce = self.rendezvous_cache.get_rendezvous_challenge(peer) or EMPTY_PEER_CHALLENGE + if expected_nonce == EMPTY_PEER_CHALLENGE or expected_nonce != challenge.nonce: self.logger.warning(f"Received invalid rendezvous response from {peer.mid}") return if not self.crypto.is_valid_signature(peer.key, raw_payload.challenge, signature.signature): self.logger.warning(f"Received invalid signature from {peer.mid}") return + else: + # This nonce has been burned. + self.rendezvous_cache.clear_peer_challenge(peer) self.logger.debug(f"Received valid rendezvous response from {peer.mid}") with db_session: @@ -237,3 +265,4 @@ def _handle_rendezvous_response(self, peer, raw_payload: RawRendezvousResponsePa if not certificate: certificate = self.rdb.Certificate(public_key=peer.mid, counter=0) certificate.counter += 1 + return diff --git a/src/tribler/core/components/popularity/rendezvous/db/orm_bindings/certificate.py b/src/tribler/core/components/popularity/rendezvous/db/orm_bindings/certificate.py index ca4323c457f..c5c73029923 100644 --- a/src/tribler/core/components/popularity/rendezvous/db/orm_bindings/certificate.py +++ b/src/tribler/core/components/popularity/rendezvous/db/orm_bindings/certificate.py @@ -6,9 +6,9 @@ class RendezvousCertificate(db.Entity): public_key = Required(bytes, index=True) counter = Required(int) - @staticmethod + @classmethod @db_session - def get_count(pk: bytes) -> int: + def get_count(cls, pk: bytes) -> int: return RendezvousCertificate.get(public_key == pk).count() return RendezvousCertificate diff --git a/src/tribler/core/components/popularity/rendezvous/rendezvous.py b/src/tribler/core/components/popularity/rendezvous/rendezvous.py index 6a7929ddfa5..497d8899f3d 100644 --- a/src/tribler/core/components/popularity/rendezvous/rendezvous.py +++ b/src/tribler/core/components/popularity/rendezvous/rendezvous.py @@ -1,5 +1,6 @@ +import dataclasses import secrets -from dataclasses import dataclass +from dataclasses import dataclass, fields from ipv8.keyvault.crypto import default_eccrypto from ipv8.messaging.payload_dataclass import overwrite_dataclass, type_from_format @@ -19,6 +20,10 @@ def sign(self, sk, crypto=default_eccrypto) -> bytes: serialized = default_serializer.pack_serializable(self) return crypto.create_signature(sk, serialized) + @staticmethod + def create(): + return RendezvousChallenge(secrets.token_bytes(32)) + @dataclass class RendezvousSignature: diff --git a/src/tribler/core/components/popularity/rendezvous/rendezvous_cache.py b/src/tribler/core/components/popularity/rendezvous/rendezvous_cache.py index cdabefd5490..ee30c0f1e1e 100644 --- a/src/tribler/core/components/popularity/rendezvous/rendezvous_cache.py +++ b/src/tribler/core/components/popularity/rendezvous/rendezvous_cache.py @@ -1,23 +1,40 @@ import threading from collections import defaultdict +from time import time + +from ipv8.peer import Peer + +EMPTY_PEER_CHALLENGE = b'0' * 16 +RENDEZVOUS_TIMEOUT = 60 class RendezvousCache: def __init__(self): - self._cache = defaultdict(bytes) + self._cache = {} self._rendezvous_lock = threading.Lock() - def __getitem__(self, item): + def add_peer(self, peer, peer_challenge=EMPTY_PEER_CHALLENGE): with self._rendezvous_lock: - return self._cache[item] + self._cache[peer] = (peer_challenge, time()) - def __setitem__(self, key, value): - with self._rendezvous_lock: - self._cache[key] = value + def get_rendezvous_peers(self): + return self._cache.keys() + + def get_rendezvous_challenge(self, peer): + return self._cache[peer][0] - def get_rendezvous_challenge(self, peer_mid): - return self.__getitem__(peer_mid) + def set_rendezvous_challenge(self, peer, challenge): + return self.add_peer(peer, challenge) - def set_rendezvous_challenge(self, peer_mid, challenge): - return self.__setitem__(peer_mid, challenge) + def clear_inactive_peers(self, timeout=RENDEZVOUS_TIMEOUT): + with self._rendezvous_lock: + to_remove = [] + for peer, (peer_challenge, timestamp) in self._cache.items(): + if time() - timestamp > timeout: + to_remove.append(peer) + [self._cache.pop(peer) for peer in to_remove] + + def clear_peer_challenge(self, peer): + with self._rendezvous_lock: + self._cache[peer] = (EMPTY_PEER_CHALLENGE, time()) From b3e79cab0e91c45b5d19a2880b0e95be127fc907 Mon Sep 17 00:00:00 2001 From: Rowdy Mitchell Chotkan Date: Tue, 27 Jun 2023 15:06:38 +0200 Subject: [PATCH 4/9] Add rendezvous tests --- .../tests/test_popularity_community.py | 83 +++++++++++++++++++ .../popularity/rendezvous/tests/__init__.py | 0 .../rendezvous/tests/test_rendezvous_test.py | 43 ++++++++++ 3 files changed, 126 insertions(+) create mode 100644 src/tribler/core/components/popularity/rendezvous/tests/__init__.py create mode 100644 src/tribler/core/components/popularity/rendezvous/tests/test_rendezvous_test.py diff --git a/src/tribler/core/components/popularity/community/tests/test_popularity_community.py b/src/tribler/core/components/popularity/community/tests/test_popularity_community.py index 78415f61ed8..fcd7b07b0f9 100644 --- a/src/tribler/core/components/popularity/community/tests/test_popularity_community.py +++ b/src/tribler/core/components/popularity/community/tests/test_popularity_community.py @@ -4,6 +4,7 @@ from unittest.mock import Mock from ipv8.keyvault.crypto import default_eccrypto +from ipv8.peer import Peer from ipv8.test.base import TestBase from ipv8.test.mocking.ipv8 import MockIPv8 from pony.orm import db_session @@ -11,6 +12,8 @@ from tribler.core.components.metadata_store.db.store import MetadataStore from tribler.core.components.metadata_store.remote_query_community.settings import RemoteQueryCommunitySettings from tribler.core.components.popularity.community.popularity_community import PopularityCommunity +from tribler.core.components.popularity.rendezvous.db.database import RendezvousDatabase +from tribler.core.components.popularity.rendezvous.rendezvous import RendezvousResponsePayload, RendezvousChallenge from tribler.core.components.torrent_checker.torrent_checker.torrentchecker_session import HealthInfo from tribler.core.tests.tools.base_test import MockObject from tribler.core.utilities.path_util import Path @@ -42,6 +45,26 @@ def _generate_checked_torrents(count: int, status: str = None) -> List[HealthInf class TestPopularityCommunity(TestBase): NUM_NODES = 2 + def initialize(self, overlay_class, node_count, *args, **kwargs): + self.overlay_class = overlay_class + self.nodes = [self.create_node(*args, **kwargs) for _ in range(node_count)] + + # Add nodes to each other. + for node in self.nodes: + for other in self.nodes: + if other == node: + continue + private_peer = other.my_peer + private_peer.new_style_intro = True + public_peer = Peer(private_peer.public_key, private_peer.address) + public_peer.new_style_intro = True + node.network.add_verified_peer(public_peer) + node.network.discover_services(public_peer, [overlay_class.community_id]) + + # Make packet handling fragile. + for i in range(len(self.nodes)): + self.patch_overlays(i) + def setUp(self): super().setUp() self.count = 0 @@ -57,6 +80,7 @@ def create_node(self, *args, **kwargs): mds = MetadataStore(Path(self.temporary_directory()) / f"{self.count}", Path(self.temporary_directory()), default_eccrypto.generate_key("curve25519")) + rdb = RendezvousDatabase(Path(self.temporary_directory()) / f"{self.count + 1}") self.metadata_store_set.add(mds) torrent_checker = MockObject() torrent_checker.torrents_checked = {} @@ -65,6 +89,7 @@ def create_node(self, *args, **kwargs): rqc_settings = RemoteQueryCommunitySettings() return MockIPv8("curve25519", PopularityCommunity, metadata_store=mds, + rendezvous_db=rdb, torrent_checker=torrent_checker, rqc_settings=rqc_settings ) @@ -210,3 +235,61 @@ async def test_skip_torrent_query_back_for_known_torrent(self): await self.init_first_node_and_gossip( HealthInfo(infohash, seeders=200, leechers=0)) self.nodes[1].overlay.send_remote_select.assert_not_called() + + async def test_introduction_rendezvous_payload(self): + await self.introduce_nodes() + await self.deliver_messages() + self.nodes[0].overlay.send_introduction_request(self.nodes[1].my_peer) + await self.deliver_messages() + with db_session: + assert self.nodes[0].overlay.rdb.Certificate.get(public_key=self.nodes[1].my_peer.mid).counter == 1 + + rendezvous_peers = list(self.nodes[1].overlay.rendezvous_cache.get_rendezvous_peers()) + assert rendezvous_peers[0] == self.nodes[0].my_peer + + async def test_introduction_rendezvous_payload_multiple(self): + await self.introduce_nodes() + await self.deliver_messages() + + self.nodes[0].overlay.send_introduction_request(self.nodes[1].my_peer) + await self.deliver_messages() + + number_of_introductions = 4 + for i in range(number_of_introductions): + self.nodes[0].overlay.ping_rendezvous() + await self.deliver_messages() + + with db_session: + assert self.nodes[0].overlay.rdb.Certificate.get( + public_key=self.nodes[1].my_peer.mid).counter == number_of_introductions + 1 + + async def test_invalid_nonce(self): + await self.introduce_nodes() + await self.deliver_messages() + + self.nodes[0].overlay.rendezvous_cache.add_peer(self.nodes[1].my_peer, b'1' * 16) + + payload = self.nodes[1].overlay._create_rendezvous_response(RendezvousChallenge(b'2' * 16)) + self.nodes[1].overlay.ez_send(self.nodes[0].my_peer, payload) + await self.deliver_messages() + + with db_session: + assert self.nodes[0].overlay.rdb.Certificate.get(public_key=self.nodes[1].my_peer.mid) is None + + async def test_invalid_signature(self): + await self.introduce_nodes() + await self.deliver_messages() + + challenge_1 = RendezvousChallenge(b'1' * 16) + challenge_2 = RendezvousChallenge(b'2' * 16) + + self.nodes[0].overlay.rendezvous_cache.add_peer(self.nodes[1].my_peer, challenge_1.nonce) + + payload = self.nodes[1].overlay._create_rendezvous_response(challenge_2) + payload.challenge = challenge_1 + + self.nodes[1].overlay.ez_send(self.nodes[0].my_peer, payload) + await self.deliver_messages() + + with db_session: + assert self.nodes[0].overlay.rdb.Certificate.get(public_key=self.nodes[1].my_peer.mid) is None diff --git a/src/tribler/core/components/popularity/rendezvous/tests/__init__.py b/src/tribler/core/components/popularity/rendezvous/tests/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/src/tribler/core/components/popularity/rendezvous/tests/test_rendezvous_test.py b/src/tribler/core/components/popularity/rendezvous/tests/test_rendezvous_test.py new file mode 100644 index 00000000000..3c2c8a2339d --- /dev/null +++ b/src/tribler/core/components/popularity/rendezvous/tests/test_rendezvous_test.py @@ -0,0 +1,43 @@ +import time + +from ipv8.keyvault.crypto import default_eccrypto +from ipv8.peer import Peer +from ipv8.test.base import TestBase +from ipv8.test.mocking.endpoint import AutoMockEndpoint + +from tribler.core.components.popularity.rendezvous.rendezvous_cache import RendezvousCache, EMPTY_PEER_CHALLENGE + + +class TestRendezvousCache(TestBase): + NUM_NODES = 3 + + def setUp(self): + super().setUp() + self.peers = [Peer(default_eccrypto.generate_key(u"low")) for _ in range(self.NUM_NODES)] + self._cache = RendezvousCache() + + def test_add_peer(self): + self._cache.add_peer(self.peers[0]) + self._cache.add_peer(self.peers[1]) + self.assertEqual(len(self._cache.get_rendezvous_peers()), 2) + + def test_set_rendezvous_challenge(self): + self._cache.add_peer(self.peers[0]) + self._cache.set_rendezvous_challenge(self.peers[0], b'1234') + self.assertEqual(self._cache.get_rendezvous_challenge(self.peers[0]), b'1234') + + def test_clear_inactive_peers(self): + self._cache.add_peer(self.peers[0]) + self._cache.add_peer(self.peers[1]) + self._cache.add_peer(self.peers[2]) + time.sleep(1) + + self._cache.set_rendezvous_challenge(self.peers[0], b'1234') + self._cache.clear_inactive_peers(1) + + self.assertEqual(len(self._cache.get_rendezvous_peers()), 1) + + def test_clear_peer_challenge(self): + self._cache.add_peer(self.peers[0], b'1234') + self._cache.clear_peer_challenge(self.peers[0]) + assert self._cache.get_rendezvous_challenge(self.peers[0]) == EMPTY_PEER_CHALLENGE From f1120ee6cee99798c06903723d11301fa73fb9be Mon Sep 17 00:00:00 2001 From: Rowdy Mitchell Chotkan Date: Wed, 28 Jun 2023 10:17:58 +0200 Subject: [PATCH 5/9] Move tests to seperate class --- .../tests/test_popularity_community.py | 161 +++++++++++------- 1 file changed, 99 insertions(+), 62 deletions(-) diff --git a/src/tribler/core/components/popularity/community/tests/test_popularity_community.py b/src/tribler/core/components/popularity/community/tests/test_popularity_community.py index fcd7b07b0f9..3003dc3bb41 100644 --- a/src/tribler/core/components/popularity/community/tests/test_popularity_community.py +++ b/src/tribler/core/components/popularity/community/tests/test_popularity_community.py @@ -42,6 +42,105 @@ def _generate_checked_torrents(count: int, status: str = None) -> List[HealthInf return [_generate_single_checked_torrent(status) for _ in range(count)] +class TestRendezvousLogic(TestBase): + NUM_NODES = 3 + + def setUp(self): + super().setUp() + self.count = 0 + self.initialize(PopularityCommunity, self.NUM_NODES) + + def create_node(self, *args, **kwargs): + rdb = RendezvousDatabase(Path(self.temporary_directory()) / f"{self.count + 1}") + torrent_checker = MockObject() + torrent_checker.torrents_checked = {} + mds = MetadataStore(Path(self.temporary_directory()) / f"{self.count}", + Path(self.temporary_directory()), + default_eccrypto.generate_key("curve25519")) + rqc_settings = RemoteQueryCommunitySettings() + + self.count += 1 + return MockIPv8("curve25519", PopularityCommunity, metadata_store=mds, + rendezvous_db=rdb, + torrent_checker=torrent_checker, + rqc_settings=rqc_settings + ) + + async def test_introduction_rendezvous_payload(self): + await self.introduce_nodes() + await self.deliver_messages() + + self.nodes[0].overlay.send_introduction_request(self.nodes[1].my_peer) + await self.deliver_messages() + + self.nodes[0].overlay.send_introduction_request(self.nodes[2].my_peer) + await self.deliver_messages() + + with db_session: + assert self.nodes[0].overlay.rdb.Certificate.get(public_key=self.nodes[1].my_peer.mid).counter == 1 + assert self.nodes[0].overlay.rdb.Certificate.get(public_key=self.nodes[2].my_peer.mid).counter == 1 + + # Check if the rendezvous cache is updated + rendezvous_peers = list(self.nodes[1].overlay.rendezvous_cache.get_rendezvous_peers()) + assert rendezvous_peers[0] == self.nodes[0].my_peer + rendezvous_peers = list(self.nodes[2].overlay.rendezvous_cache.get_rendezvous_peers()) + assert rendezvous_peers[0] == self.nodes[0].my_peer + + async def test_rendezvous_payloads(self): + await self.introduce_nodes() + await self.deliver_messages() + + self.nodes[0].overlay.send_introduction_request(self.nodes[1].my_peer) + self.nodes[0].overlay.send_introduction_request(self.nodes[2].my_peer) + await self.deliver_messages() + + number_of_rendezvous = 4 + for i in range(number_of_rendezvous): + for j in range(self.count): + self.nodes[j].overlay.ping_rendezvous() + await self.deliver_messages() + + with db_session: + # Peer 0 should have a counter of 1 more + assert self.nodes[0].overlay.rdb.Certificate.get( + public_key=self.nodes[1].my_peer.mid).counter == number_of_rendezvous + 1 + assert self.nodes[1].overlay.rdb.Certificate.get( + public_key=self.nodes[0].my_peer.mid).counter == number_of_rendezvous + assert self.nodes[2].overlay.rdb.Certificate.get( + public_key=self.nodes[0].my_peer.mid).counter == number_of_rendezvous + + async def test_invalid_nonce(self): + await self.introduce_nodes() + await self.deliver_messages() + + self.nodes[0].overlay.rendezvous_cache.add_peer(self.nodes[1].my_peer, b'1' * 16) + + payload = self.nodes[1].overlay._create_rendezvous_response(RendezvousChallenge(b'2' * 16)) + self.nodes[1].overlay.ez_send(self.nodes[0].my_peer, payload) + await self.deliver_messages() + + with db_session: + assert self.nodes[0].overlay.rdb.Certificate.get(public_key=self.nodes[1].my_peer.mid) is None + + async def test_invalid_signature(self): + await self.introduce_nodes() + await self.deliver_messages() + + challenge_1 = RendezvousChallenge(b'1' * 16) + challenge_2 = RendezvousChallenge(b'2' * 16) + + self.nodes[0].overlay.rendezvous_cache.add_peer(self.nodes[1].my_peer, challenge_1.nonce) + + payload = self.nodes[1].overlay._create_rendezvous_response(challenge_2) + payload.challenge = challenge_1 + + self.nodes[1].overlay.ez_send(self.nodes[0].my_peer, payload) + await self.deliver_messages() + + with db_session: + assert self.nodes[0].overlay.rdb.Certificate.get(public_key=self.nodes[1].my_peer.mid) is None + + class TestPopularityCommunity(TestBase): NUM_NODES = 2 @@ -55,9 +154,7 @@ def initialize(self, overlay_class, node_count, *args, **kwargs): if other == node: continue private_peer = other.my_peer - private_peer.new_style_intro = True public_peer = Peer(private_peer.public_key, private_peer.address) - public_peer.new_style_intro = True node.network.add_verified_peer(public_peer) node.network.discover_services(public_peer, [overlay_class.community_id]) @@ -80,7 +177,6 @@ def create_node(self, *args, **kwargs): mds = MetadataStore(Path(self.temporary_directory()) / f"{self.count}", Path(self.temporary_directory()), default_eccrypto.generate_key("curve25519")) - rdb = RendezvousDatabase(Path(self.temporary_directory()) / f"{self.count + 1}") self.metadata_store_set.add(mds) torrent_checker = MockObject() torrent_checker.torrents_checked = {} @@ -89,7 +185,6 @@ def create_node(self, *args, **kwargs): rqc_settings = RemoteQueryCommunitySettings() return MockIPv8("curve25519", PopularityCommunity, metadata_store=mds, - rendezvous_db=rdb, torrent_checker=torrent_checker, rqc_settings=rqc_settings ) @@ -235,61 +330,3 @@ async def test_skip_torrent_query_back_for_known_torrent(self): await self.init_first_node_and_gossip( HealthInfo(infohash, seeders=200, leechers=0)) self.nodes[1].overlay.send_remote_select.assert_not_called() - - async def test_introduction_rendezvous_payload(self): - await self.introduce_nodes() - await self.deliver_messages() - self.nodes[0].overlay.send_introduction_request(self.nodes[1].my_peer) - await self.deliver_messages() - with db_session: - assert self.nodes[0].overlay.rdb.Certificate.get(public_key=self.nodes[1].my_peer.mid).counter == 1 - - rendezvous_peers = list(self.nodes[1].overlay.rendezvous_cache.get_rendezvous_peers()) - assert rendezvous_peers[0] == self.nodes[0].my_peer - - async def test_introduction_rendezvous_payload_multiple(self): - await self.introduce_nodes() - await self.deliver_messages() - - self.nodes[0].overlay.send_introduction_request(self.nodes[1].my_peer) - await self.deliver_messages() - - number_of_introductions = 4 - for i in range(number_of_introductions): - self.nodes[0].overlay.ping_rendezvous() - await self.deliver_messages() - - with db_session: - assert self.nodes[0].overlay.rdb.Certificate.get( - public_key=self.nodes[1].my_peer.mid).counter == number_of_introductions + 1 - - async def test_invalid_nonce(self): - await self.introduce_nodes() - await self.deliver_messages() - - self.nodes[0].overlay.rendezvous_cache.add_peer(self.nodes[1].my_peer, b'1' * 16) - - payload = self.nodes[1].overlay._create_rendezvous_response(RendezvousChallenge(b'2' * 16)) - self.nodes[1].overlay.ez_send(self.nodes[0].my_peer, payload) - await self.deliver_messages() - - with db_session: - assert self.nodes[0].overlay.rdb.Certificate.get(public_key=self.nodes[1].my_peer.mid) is None - - async def test_invalid_signature(self): - await self.introduce_nodes() - await self.deliver_messages() - - challenge_1 = RendezvousChallenge(b'1' * 16) - challenge_2 = RendezvousChallenge(b'2' * 16) - - self.nodes[0].overlay.rendezvous_cache.add_peer(self.nodes[1].my_peer, challenge_1.nonce) - - payload = self.nodes[1].overlay._create_rendezvous_response(challenge_2) - payload.challenge = challenge_1 - - self.nodes[1].overlay.ez_send(self.nodes[0].my_peer, payload) - await self.deliver_messages() - - with db_session: - assert self.nodes[0].overlay.rdb.Certificate.get(public_key=self.nodes[1].my_peer.mid) is None From 427f6c5a118ea504ca464c8787a507c8d415e9b2 Mon Sep 17 00:00:00 2001 From: Rowdy Mitchell Chotkan Date: Wed, 28 Jun 2023 13:25:33 +0200 Subject: [PATCH 6/9] Use db name const + move tests --- .../community/popularity_community.py | 2 +- .../tests/test_popularity_community.py | 198 +++++++++--------- .../popularity/popularity_component.py | 2 +- 3 files changed, 101 insertions(+), 101 deletions(-) diff --git a/src/tribler/core/components/popularity/community/popularity_community.py b/src/tribler/core/components/popularity/community/popularity_community.py index ffac225333d..740a397c0f3 100644 --- a/src/tribler/core/components/popularity/community/popularity_community.py +++ b/src/tribler/core/components/popularity/community/popularity_community.py @@ -43,7 +43,7 @@ class PopularityCommunity(RemoteQueryCommunity, VersionCommunityMixin): GOSSIP_RANDOM_TORRENT_COUNT = 10 PING_INTERVAL_RENDEZVOUS = 60 # seconds - DB_NAME = 'rendezvous.db' + RENDEZVOUS_DB_NAME = 'rendezvous.db' community_id = unhexlify('9aca62f878969c437da9844cba29a134917e1649') diff --git a/src/tribler/core/components/popularity/community/tests/test_popularity_community.py b/src/tribler/core/components/popularity/community/tests/test_popularity_community.py index 3003dc3bb41..5bdbab7523a 100644 --- a/src/tribler/core/components/popularity/community/tests/test_popularity_community.py +++ b/src/tribler/core/components/popularity/community/tests/test_popularity_community.py @@ -42,105 +42,6 @@ def _generate_checked_torrents(count: int, status: str = None) -> List[HealthInf return [_generate_single_checked_torrent(status) for _ in range(count)] -class TestRendezvousLogic(TestBase): - NUM_NODES = 3 - - def setUp(self): - super().setUp() - self.count = 0 - self.initialize(PopularityCommunity, self.NUM_NODES) - - def create_node(self, *args, **kwargs): - rdb = RendezvousDatabase(Path(self.temporary_directory()) / f"{self.count + 1}") - torrent_checker = MockObject() - torrent_checker.torrents_checked = {} - mds = MetadataStore(Path(self.temporary_directory()) / f"{self.count}", - Path(self.temporary_directory()), - default_eccrypto.generate_key("curve25519")) - rqc_settings = RemoteQueryCommunitySettings() - - self.count += 1 - return MockIPv8("curve25519", PopularityCommunity, metadata_store=mds, - rendezvous_db=rdb, - torrent_checker=torrent_checker, - rqc_settings=rqc_settings - ) - - async def test_introduction_rendezvous_payload(self): - await self.introduce_nodes() - await self.deliver_messages() - - self.nodes[0].overlay.send_introduction_request(self.nodes[1].my_peer) - await self.deliver_messages() - - self.nodes[0].overlay.send_introduction_request(self.nodes[2].my_peer) - await self.deliver_messages() - - with db_session: - assert self.nodes[0].overlay.rdb.Certificate.get(public_key=self.nodes[1].my_peer.mid).counter == 1 - assert self.nodes[0].overlay.rdb.Certificate.get(public_key=self.nodes[2].my_peer.mid).counter == 1 - - # Check if the rendezvous cache is updated - rendezvous_peers = list(self.nodes[1].overlay.rendezvous_cache.get_rendezvous_peers()) - assert rendezvous_peers[0] == self.nodes[0].my_peer - rendezvous_peers = list(self.nodes[2].overlay.rendezvous_cache.get_rendezvous_peers()) - assert rendezvous_peers[0] == self.nodes[0].my_peer - - async def test_rendezvous_payloads(self): - await self.introduce_nodes() - await self.deliver_messages() - - self.nodes[0].overlay.send_introduction_request(self.nodes[1].my_peer) - self.nodes[0].overlay.send_introduction_request(self.nodes[2].my_peer) - await self.deliver_messages() - - number_of_rendezvous = 4 - for i in range(number_of_rendezvous): - for j in range(self.count): - self.nodes[j].overlay.ping_rendezvous() - await self.deliver_messages() - - with db_session: - # Peer 0 should have a counter of 1 more - assert self.nodes[0].overlay.rdb.Certificate.get( - public_key=self.nodes[1].my_peer.mid).counter == number_of_rendezvous + 1 - assert self.nodes[1].overlay.rdb.Certificate.get( - public_key=self.nodes[0].my_peer.mid).counter == number_of_rendezvous - assert self.nodes[2].overlay.rdb.Certificate.get( - public_key=self.nodes[0].my_peer.mid).counter == number_of_rendezvous - - async def test_invalid_nonce(self): - await self.introduce_nodes() - await self.deliver_messages() - - self.nodes[0].overlay.rendezvous_cache.add_peer(self.nodes[1].my_peer, b'1' * 16) - - payload = self.nodes[1].overlay._create_rendezvous_response(RendezvousChallenge(b'2' * 16)) - self.nodes[1].overlay.ez_send(self.nodes[0].my_peer, payload) - await self.deliver_messages() - - with db_session: - assert self.nodes[0].overlay.rdb.Certificate.get(public_key=self.nodes[1].my_peer.mid) is None - - async def test_invalid_signature(self): - await self.introduce_nodes() - await self.deliver_messages() - - challenge_1 = RendezvousChallenge(b'1' * 16) - challenge_2 = RendezvousChallenge(b'2' * 16) - - self.nodes[0].overlay.rendezvous_cache.add_peer(self.nodes[1].my_peer, challenge_1.nonce) - - payload = self.nodes[1].overlay._create_rendezvous_response(challenge_2) - payload.challenge = challenge_1 - - self.nodes[1].overlay.ez_send(self.nodes[0].my_peer, payload) - await self.deliver_messages() - - with db_session: - assert self.nodes[0].overlay.rdb.Certificate.get(public_key=self.nodes[1].my_peer.mid) is None - - class TestPopularityCommunity(TestBase): NUM_NODES = 2 @@ -330,3 +231,102 @@ async def test_skip_torrent_query_back_for_known_torrent(self): await self.init_first_node_and_gossip( HealthInfo(infohash, seeders=200, leechers=0)) self.nodes[1].overlay.send_remote_select.assert_not_called() + + +class TestRendezvousLogic(TestBase): + NUM_NODES = 3 + + def setUp(self): + super().setUp() + self.count = 0 + self.initialize(PopularityCommunity, self.NUM_NODES) + + def create_node(self, *args, **kwargs): + rdb = RendezvousDatabase(Path(self.temporary_directory()) / f"{self.count + 1}") + torrent_checker = MockObject() + torrent_checker.torrents_checked = {} + mds = MetadataStore(Path(self.temporary_directory()) / f"{self.count}", + Path(self.temporary_directory()), + default_eccrypto.generate_key("curve25519")) + rqc_settings = RemoteQueryCommunitySettings() + + self.count += 1 + return MockIPv8("curve25519", PopularityCommunity, metadata_store=mds, + rendezvous_db=rdb, + torrent_checker=torrent_checker, + rqc_settings=rqc_settings + ) + + async def test_introduction_rendezvous_payload(self): + await self.introduce_nodes() + await self.deliver_messages() + + self.nodes[0].overlay.send_introduction_request(self.nodes[1].my_peer) + await self.deliver_messages() + + self.nodes[0].overlay.send_introduction_request(self.nodes[2].my_peer) + await self.deliver_messages() + + with db_session: + assert self.nodes[0].overlay.rdb.Certificate.get(public_key=self.nodes[1].my_peer.mid).counter == 1 + assert self.nodes[0].overlay.rdb.Certificate.get(public_key=self.nodes[2].my_peer.mid).counter == 1 + + # Check if the rendezvous cache is updated + rendezvous_peers = list(self.nodes[1].overlay.rendezvous_cache.get_rendezvous_peers()) + assert rendezvous_peers[0] == self.nodes[0].my_peer + rendezvous_peers = list(self.nodes[2].overlay.rendezvous_cache.get_rendezvous_peers()) + assert rendezvous_peers[0] == self.nodes[0].my_peer + + async def test_rendezvous_payloads(self): + await self.introduce_nodes() + await self.deliver_messages() + + self.nodes[0].overlay.send_introduction_request(self.nodes[1].my_peer) + self.nodes[0].overlay.send_introduction_request(self.nodes[2].my_peer) + await self.deliver_messages() + + number_of_rendezvous = 4 + for i in range(number_of_rendezvous): + for j in range(self.count): + self.nodes[j].overlay.ping_rendezvous() + await self.deliver_messages() + + with db_session: + # Peer 0 should have a counter of 1 more + assert self.nodes[0].overlay.rdb.Certificate.get( + public_key=self.nodes[1].my_peer.mid).counter == number_of_rendezvous + 1 + assert self.nodes[1].overlay.rdb.Certificate.get( + public_key=self.nodes[0].my_peer.mid).counter == number_of_rendezvous + assert self.nodes[2].overlay.rdb.Certificate.get( + public_key=self.nodes[0].my_peer.mid).counter == number_of_rendezvous + + async def test_invalid_nonce(self): + await self.introduce_nodes() + await self.deliver_messages() + + self.nodes[0].overlay.rendezvous_cache.add_peer(self.nodes[1].my_peer, b'1' * 16) + + payload = self.nodes[1].overlay._create_rendezvous_response(RendezvousChallenge(b'2' * 16)) + self.nodes[1].overlay.ez_send(self.nodes[0].my_peer, payload) + await self.deliver_messages() + + with db_session: + assert self.nodes[0].overlay.rdb.Certificate.get(public_key=self.nodes[1].my_peer.mid) is None + + async def test_invalid_signature(self): + await self.introduce_nodes() + await self.deliver_messages() + + challenge_1 = RendezvousChallenge(b'1' * 16) + challenge_2 = RendezvousChallenge(b'2' * 16) + + self.nodes[0].overlay.rendezvous_cache.add_peer(self.nodes[1].my_peer, challenge_1.nonce) + + payload = self.nodes[1].overlay._create_rendezvous_response(challenge_2) + payload.challenge = challenge_1 + + self.nodes[1].overlay.ez_send(self.nodes[0].my_peer, payload) + await self.deliver_messages() + + with db_session: + assert self.nodes[0].overlay.rdb.Certificate.get(public_key=self.nodes[1].my_peer.mid) is None diff --git a/src/tribler/core/components/popularity/popularity_component.py b/src/tribler/core/components/popularity/popularity_component.py index 9cf44d3a670..2845193f05c 100644 --- a/src/tribler/core/components/popularity/popularity_component.py +++ b/src/tribler/core/components/popularity/popularity_component.py @@ -24,7 +24,7 @@ async def run(self): metadata_store_component = await self.require_component(MetadataStoreComponent) torrent_checker_component = await self.require_component(TorrentCheckerComponent) - rendezvous_db = RendezvousDatabase(db_path=self.session.config.state_dir / STATEDIR_DB_DIR / "rendezvous.db") + rendezvous_db = RendezvousDatabase(db_path=self.session.config.state_dir / STATEDIR_DB_DIR / PopularityCommunity.RENDEZVOUS_DB_NAME) config = self.session.config community = PopularityCommunity(self._ipv8_component.peer, From 263b293d4a750fb56b7730604b29edc0cc9267b7 Mon Sep 17 00:00:00 2001 From: Rowdy Mitchell Chotkan Date: Wed, 28 Jun 2023 13:30:34 +0200 Subject: [PATCH 7/9] Revert "Upgrade PyQt, Yarl, and LibTorrent dependencies" This reverts commit 91d360a0f7dff1c3bf005010cb6d7bb6f19d890f. --- requirements-core.txt | 4 ++-- requirements.txt | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/requirements-core.txt b/requirements-core.txt index 3e52df621f6..f5a734f0ca8 100644 --- a/requirements-core.txt +++ b/requirements-core.txt @@ -20,10 +20,10 @@ pyyaml==6.0 sentry-sdk==1.14.0 service-identity==21.1.0 yappi==1.4.0 -yarl==1.8.2 # keep this dependency higher than 1.6.3. See: https://github.com/aio-libs/yarl/issues/517 +yarl==1.7.2 # keep this dependency higher than 1.6.3. See: https://github.com/aio-libs/yarl/issues/517 bitarray==2.5.1 pyipv8==2.10.0 -libtorrent==1.2.19 +libtorrent==1.2.15 file-read-backwards==2.0.0 Brotli==1.0.9 # to prevent AttributeError on macOs: module 'brotli' has no attribute 'error' (in urllib3.response) human-readable==1.3.2 diff --git a/requirements.txt b/requirements.txt index c3f311c6ba2..0085760f29e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,6 +2,6 @@ Pillow==9.3.0 PyQt5==5.15.1 -PyQt5-sip==12.11.1 +PyQt5-sip==12.8.1 pyqtgraph==0.12.3 PyQtWebEngine==5.15.2 From eb0b667d9fed9ac67e6031d5fad16d214f66b9ed Mon Sep 17 00:00:00 2001 From: Rowdy Mitchell Chotkan Date: Wed, 28 Jun 2023 13:35:06 +0200 Subject: [PATCH 8/9] Remove unnecessary override --- .../tests/test_popularity_community.py | 18 ------------------ 1 file changed, 18 deletions(-) diff --git a/src/tribler/core/components/popularity/community/tests/test_popularity_community.py b/src/tribler/core/components/popularity/community/tests/test_popularity_community.py index 5bdbab7523a..2a78ab6b560 100644 --- a/src/tribler/core/components/popularity/community/tests/test_popularity_community.py +++ b/src/tribler/core/components/popularity/community/tests/test_popularity_community.py @@ -45,24 +45,6 @@ def _generate_checked_torrents(count: int, status: str = None) -> List[HealthInf class TestPopularityCommunity(TestBase): NUM_NODES = 2 - def initialize(self, overlay_class, node_count, *args, **kwargs): - self.overlay_class = overlay_class - self.nodes = [self.create_node(*args, **kwargs) for _ in range(node_count)] - - # Add nodes to each other. - for node in self.nodes: - for other in self.nodes: - if other == node: - continue - private_peer = other.my_peer - public_peer = Peer(private_peer.public_key, private_peer.address) - node.network.add_verified_peer(public_peer) - node.network.discover_services(public_peer, [overlay_class.community_id]) - - # Make packet handling fragile. - for i in range(len(self.nodes)): - self.patch_overlays(i) - def setUp(self): super().setUp() self.count = 0 From 6dc9d4c924556b2984a3e60ba9eff85fa01ee7e5 Mon Sep 17 00:00:00 2001 From: Rowdy Mitchell Chotkan Date: Wed, 16 Aug 2023 13:26:45 +0200 Subject: [PATCH 9/9] Address requested changes and comments --- .../popularity/community/popularity_community.py | 16 +++++++--------- .../community/tests/test_popularity_community.py | 5 ++--- .../popularity/popularity_component.py | 11 ++++++++--- .../popularity/rendezvous/db/database.py | 3 +++ .../rendezvous/db/orm_bindings/certificate.py | 5 ----- ...ndezvous_test.py => test_rendezvous_cache.py} | 1 - 6 files changed, 20 insertions(+), 21 deletions(-) rename src/tribler/core/components/popularity/rendezvous/tests/{test_rendezvous_test.py => test_rendezvous_cache.py} (96%) diff --git a/src/tribler/core/components/popularity/community/popularity_community.py b/src/tribler/core/components/popularity/community/popularity_community.py index 740a397c0f3..8ccf76e6750 100644 --- a/src/tribler/core/components/popularity/community/popularity_community.py +++ b/src/tribler/core/components/popularity/community/popularity_community.py @@ -43,9 +43,8 @@ class PopularityCommunity(RemoteQueryCommunity, VersionCommunityMixin): GOSSIP_RANDOM_TORRENT_COUNT = 10 PING_INTERVAL_RENDEZVOUS = 60 # seconds - RENDEZVOUS_DB_NAME = 'rendezvous.db' - community_id = unhexlify('9aca62f878969c437da9844cba29a134917e1649') + community_id = unhexlify('9aca62f878969c437da9844cba29a134917e1648') def __init__(self, *args, torrent_checker=None, rendezvous_db=None, **kwargs): # Creating a separate instance of Network for this community to find more peers @@ -62,9 +61,9 @@ def __init__(self, *args, torrent_checker=None, rendezvous_db=None, **kwargs): self.logger.info('Popularity Community initialized (peer mid %s)', hexlify(self.my_peer.mid)) self.register_task("gossip_random_torrents", self.gossip_random_torrents_health, - interval=PopularityCommunity.GOSSIP_INTERVAL_FOR_RANDOM_TORRENTS) + interval=self.GOSSIP_INTERVAL_FOR_RANDOM_TORRENTS) self.register_task("ping_rendezvous", self.ping_rendezvous, - interval=PopularityCommunity.PING_INTERVAL_RENDEZVOUS) + interval=self.PING_INTERVAL_RENDEZVOUS) # Init version community message handlers self.init_version_community() @@ -255,14 +254,13 @@ def _handle_rendezvous_response(self, peer, raw_payload: RawRendezvousResponsePa if not self.crypto.is_valid_signature(peer.key, raw_payload.challenge, signature.signature): self.logger.warning(f"Received invalid signature from {peer.mid}") return - else: - # This nonce has been burned. - self.rendezvous_cache.clear_peer_challenge(peer) + + # This nonce has been burned. + self.rendezvous_cache.clear_peer_challenge(peer) self.logger.debug(f"Received valid rendezvous response from {peer.mid}") - with db_session: + with db_session(immediate=True): certificate = self.rdb.Certificate.get(public_key=peer.mid) if not certificate: certificate = self.rdb.Certificate(public_key=peer.mid, counter=0) certificate.counter += 1 - return diff --git a/src/tribler/core/components/popularity/community/tests/test_popularity_community.py b/src/tribler/core/components/popularity/community/tests/test_popularity_community.py index 2a78ab6b560..a71fcf2936f 100644 --- a/src/tribler/core/components/popularity/community/tests/test_popularity_community.py +++ b/src/tribler/core/components/popularity/community/tests/test_popularity_community.py @@ -4,7 +4,6 @@ from unittest.mock import Mock from ipv8.keyvault.crypto import default_eccrypto -from ipv8.peer import Peer from ipv8.test.base import TestBase from ipv8.test.mocking.ipv8 import MockIPv8 from pony.orm import db_session @@ -13,7 +12,7 @@ from tribler.core.components.metadata_store.remote_query_community.settings import RemoteQueryCommunitySettings from tribler.core.components.popularity.community.popularity_community import PopularityCommunity from tribler.core.components.popularity.rendezvous.db.database import RendezvousDatabase -from tribler.core.components.popularity.rendezvous.rendezvous import RendezvousResponsePayload, RendezvousChallenge +from tribler.core.components.popularity.rendezvous.rendezvous import RendezvousChallenge from tribler.core.components.torrent_checker.torrent_checker.torrentchecker_session import HealthInfo from tribler.core.tests.tools.base_test import MockObject from tribler.core.utilities.path_util import Path @@ -268,7 +267,7 @@ async def test_rendezvous_payloads(self): await self.deliver_messages() number_of_rendezvous = 4 - for i in range(number_of_rendezvous): + for _ in range(number_of_rendezvous): for j in range(self.count): self.nodes[j].overlay.ping_rendezvous() await self.deliver_messages() diff --git a/src/tribler/core/components/popularity/popularity_component.py b/src/tribler/core/components/popularity/popularity_component.py index 2845193f05c..1e1d86e95aa 100644 --- a/src/tribler/core/components/popularity/popularity_component.py +++ b/src/tribler/core/components/popularity/popularity_component.py @@ -12,8 +12,10 @@ class PopularityComponent(Component): - community: PopularityCommunity = None + RENDEZVOUS_DB_NAME = 'rendezvous.db' + community: PopularityCommunity = None + rendezvous_db: RendezvousDatabase = None _ipv8_component: Ipv8Component = None async def run(self): @@ -24,7 +26,8 @@ async def run(self): metadata_store_component = await self.require_component(MetadataStoreComponent) torrent_checker_component = await self.require_component(TorrentCheckerComponent) - rendezvous_db = RendezvousDatabase(db_path=self.session.config.state_dir / STATEDIR_DB_DIR / PopularityCommunity.RENDEZVOUS_DB_NAME) + self.rendezvous_db = RendezvousDatabase( + db_path=self.session.config.state_dir / STATEDIR_DB_DIR / self.RENDEZVOUS_DB_NAME) config = self.session.config community = PopularityCommunity(self._ipv8_component.peer, @@ -33,7 +36,7 @@ async def run(self): settings=config.popularity_community, rqc_settings=config.remote_query_community, metadata_store=metadata_store_component.mds, - rendezvous_db=rendezvous_db, + rendezvous_db=self.rendezvous_db, torrent_checker=torrent_checker_component.torrent_checker) self.community = community @@ -44,3 +47,5 @@ async def shutdown(self): await super().shutdown() if self._ipv8_component and self.community: await self._ipv8_component.unload_community(self.community) + if self.rendezvous_db: + self.rendezvous_db.shutdown() diff --git a/src/tribler/core/components/popularity/rendezvous/db/database.py b/src/tribler/core/components/popularity/rendezvous/db/database.py index 6005993f8bc..db3a7003bc1 100644 --- a/src/tribler/core/components/popularity/rendezvous/db/database.py +++ b/src/tribler/core/components/popularity/rendezvous/db/database.py @@ -31,3 +31,6 @@ def __init__(self, db_path: Union[Path, type(MEMORY_DB)]): if create_db: with db_session: self.MiscData(name="db_version", value=str(self.DB_VERSION)) + + def shutdown(self) -> None: + self.database.disconnect() diff --git a/src/tribler/core/components/popularity/rendezvous/db/orm_bindings/certificate.py b/src/tribler/core/components/popularity/rendezvous/db/orm_bindings/certificate.py index c5c73029923..4f6821c5868 100644 --- a/src/tribler/core/components/popularity/rendezvous/db/orm_bindings/certificate.py +++ b/src/tribler/core/components/popularity/rendezvous/db/orm_bindings/certificate.py @@ -6,9 +6,4 @@ class RendezvousCertificate(db.Entity): public_key = Required(bytes, index=True) counter = Required(int) - @classmethod - @db_session - def get_count(cls, pk: bytes) -> int: - return RendezvousCertificate.get(public_key == pk).count() - return RendezvousCertificate diff --git a/src/tribler/core/components/popularity/rendezvous/tests/test_rendezvous_test.py b/src/tribler/core/components/popularity/rendezvous/tests/test_rendezvous_cache.py similarity index 96% rename from src/tribler/core/components/popularity/rendezvous/tests/test_rendezvous_test.py rename to src/tribler/core/components/popularity/rendezvous/tests/test_rendezvous_cache.py index 3c2c8a2339d..b23162e19f0 100644 --- a/src/tribler/core/components/popularity/rendezvous/tests/test_rendezvous_test.py +++ b/src/tribler/core/components/popularity/rendezvous/tests/test_rendezvous_cache.py @@ -3,7 +3,6 @@ from ipv8.keyvault.crypto import default_eccrypto from ipv8.peer import Peer from ipv8.test.base import TestBase -from ipv8.test.mocking.endpoint import AutoMockEndpoint from tribler.core.components.popularity.rendezvous.rendezvous_cache import RendezvousCache, EMPTY_PEER_CHALLENGE