Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rendezvous Certificates #7517

Closed
wants to merge 9 commits into from
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,18 @@
from typing import List, TYPE_CHECKING

from ipv8.lazy_community import lazy_wrapper
from ipv8.messaging.interfaces.udp.endpoint import UDPv4Address, UDPv4LANAddress
from ipv8.messaging.serialization import PackError
from pony.orm import db_session

from tribler.core.components.metadata_store.remote_query_community.remote_query_community import RemoteQueryCommunity
from tribler.core.components.popularity.community.payload import PopularTorrentsRequest, TorrentsHealthPayload
from tribler.core.components.popularity.community.version_community_mixin import VersionCommunityMixin
from tribler.core.components.popularity.rendezvous.db.database import RendezvousDatabase
from tribler.core.components.popularity.rendezvous.rendezvous import RendezvousRequestPayload, \
RendezvousResponsePayload, RawRendezvousResponsePayload, \
RendezvousChallenge, RendezvousSignature
from tribler.core.components.popularity.rendezvous.rendezvous_cache import RendezvousCache, EMPTY_PEER_CHALLENGE
from tribler.core.components.torrent_checker.torrent_checker.dataclasses import HealthInfo
from tribler.core.utilities.pony_utils import run_threaded
from tribler.core.utilities.unicode import hexlify
Expand All @@ -35,22 +42,97 @@
GOSSIP_POPULAR_TORRENT_COUNT = 10
GOSSIP_RANDOM_TORRENT_COUNT = 10

PING_INTERVAL_RENDEZVOUS = 60 # seconds

community_id = unhexlify('9aca62f878969c437da9844cba29a134917e1648')

def __init__(self, *args, torrent_checker=None, **kwargs):
def __init__(self, *args, torrent_checker=None, rendezvous_db=None, **kwargs):
# Creating a separate instance of Network for this community to find more peers
super().__init__(*args, **kwargs)

self.rdb: RendezvousDatabase = rendezvous_db
self.torrent_checker: TorrentChecker = torrent_checker

self.add_message_handler(TorrentsHealthPayload, self.on_torrents_health)
self.add_message_handler(PopularTorrentsRequest, self.on_popular_torrents_request)

self.add_message_handler(RendezvousRequestPayload, self.on_rendezvous_request)
self.add_message_handler(RendezvousResponsePayload, self.on_rendezvous_response)

self.logger.info('Popularity Community initialized (peer mid %s)', hexlify(self.my_peer.mid))
self.register_task("gossip_random_torrents", self.gossip_random_torrents_health,
interval=PopularityCommunity.GOSSIP_INTERVAL_FOR_RANDOM_TORRENTS)
interval=self.GOSSIP_INTERVAL_FOR_RANDOM_TORRENTS)
self.register_task("ping_rendezvous", self.ping_rendezvous,
interval=self.PING_INTERVAL_RENDEZVOUS)

# Init version community message handlers
self.init_version_community()
self.rendezvous_cache = RendezvousCache()

def send_introduction_request(self, peer):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps the logic for rendezvous_request could be relocated to the on_introduction_response function. This could simplify interactions by eliminating the need to extend introduction_request. As a bonus, this change might allow us to retain the previous community ID.

    def on_introduction_response(self, peer, dist, payload):
        super().on_introduction_response(peer, dist, payload)
        ...  # preform rendezvous_request

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current approach is still compatible! Older peers will just ignore the extra bytes. I could drop this entire extra logic though. We can get it to work through only separate payloads.

rendezvous_request = self._create_rendezvous_request()
extra_payload = self.serializer.pack_serializable(rendezvous_request)
self.logger.debug("Piggy-backing Rendezvous to %s:%d", peer.address[0], peer.address[1])
packet = self.create_introduction_request(peer.address, extra_bytes=extra_payload,
new_style=peer.new_style_intro)
self.endpoint.send(peer.address, packet)
self.rendezvous_cache.add_peer(peer, rendezvous_request.challenge.nonce)

# We override this method to add the rendezvous certificate to the introduction request
def on_introduction_request(self, peer, dist, payload):
if 0 <= self.max_peers < len(self.get_peers()):
self.logger.debug("Dropping introduction request from (%s, %d): too many peers!",

Check warning on line 84 in src/tribler/core/components/popularity/community/popularity_community.py

View check run for this annotation

Codecov / codecov/patch

src/tribler/core/components/popularity/community/popularity_community.py#L84

Added line #L84 was not covered by tests
peer.address[0], peer.address[1])
return

Check warning on line 86 in src/tribler/core/components/popularity/community/popularity_community.py

View check run for this annotation

Codecov / codecov/patch

src/tribler/core/components/popularity/community/popularity_community.py#L86

Added line #L86 was not covered by tests

extra_payload = b''
if payload.extra_bytes:
self.logger.debug("Received introduction request with extra bytes")
try:
rendezvous_request, _ = self.serializer.unpack_serializable(RendezvousRequestPayload,
payload.extra_bytes)
rendezvous_response = self._create_rendezvous_response(rendezvous_request.challenge)
# As we are sending the rendezvous response, we know this peer is interested in rendezvous.
self.rendezvous_cache.add_peer(peer)
extra_payload = self.serializer.pack_serializable(rendezvous_response)
except PackError as e:
self.logger.warning("Failed to unpack RendezvousRequestPayload: %s", e)

Check warning on line 99 in src/tribler/core/components/popularity/community/popularity_community.py

View check run for this annotation

Codecov / codecov/patch

src/tribler/core/components/popularity/community/popularity_community.py#L98-L99

Added lines #L98 - L99 were not covered by tests

if isinstance(payload.source_lan_address, UDPv4Address):
peer.address = UDPv4LANAddress(*payload.source_lan_address)
self.network.add_verified_peer(peer)
self.network.discover_services(peer, [self.community_id, ])

packet = self.create_introduction_response(payload.destination_address, peer.address, payload.identifier,
extra_bytes=extra_payload, new_style=peer.new_style_intro)

self.endpoint.send(peer.address, packet)
self.introduction_request_callback(peer, dist, payload)

@lazy_wrapper(RendezvousRequestPayload)
def on_rendezvous_request(self, peer, payload: RendezvousRequestPayload):
self.logger.debug("Received rendezvous request from %s:%d", peer.address[0], peer.address[1])
# As we are sending the rendezvous response, we know this peer is interested in rendezvous.
self.rendezvous_cache.add_peer(peer)
rendezvous_response = self._create_rendezvous_response(payload.challenge)
self.ez_send(peer, rendezvous_response)

@lazy_wrapper(RawRendezvousResponsePayload)
def on_rendezvous_response(self, peer, payload: RawRendezvousResponsePayload):
self.logger.debug("Received rendezvous response from %s:%d", peer.address[0], peer.address[1])
self._handle_rendezvous_response(peer, payload)

def introduction_response_callback(self, peer, dist, payload):
super().introduction_response_callback(peer, dist, payload)
if payload.extra_bytes:
self.logger.debug("Received introduction response with extra bytes")
try:
raw_rendezvous_response, _ = self.serializer.unpack_serializable(RawRendezvousResponsePayload,
payload.extra_bytes)
self._handle_rendezvous_response(peer, raw_rendezvous_response)

except PackError as e:
self.logger.warning("Failed to unpack RendezvousResponsePayload: %s", e)

Check warning on line 135 in src/tribler/core/components/popularity/community/popularity_community.py

View check run for this annotation

Codecov / codecov/patch

src/tribler/core/components/popularity/community/popularity_community.py#L134-L135

Added lines #L134 - L135 were not covered by tests

def introduction_request_callback(self, peer, dist, payload):
super().introduction_request_callback(peer, dist, payload)
Expand All @@ -76,6 +158,15 @@

self.ez_send(random_peer, TorrentsHealthPayload.create(random_torrents, {}))

def ping_rendezvous(self):
# Remove peers that haven't replied in a while.
self.rendezvous_cache.clear_inactive_peers()

for peer in self.rendezvous_cache.get_rendezvous_peers():
payload = self._create_rendezvous_request()
self.rendezvous_cache.set_rendezvous_challenge(peer, payload.challenge.nonce)
self.ez_send(peer, payload)

@lazy_wrapper(TorrentsHealthPayload)
async def on_torrents_health(self, peer, payload):
self.logger.debug(f"Received torrent health information for "
Expand Down Expand Up @@ -140,3 +231,36 @@

random_torrents = random.sample(checked_and_alive, num_torrents_to_send)
return random_torrents

def _create_rendezvous_request(self) -> RendezvousRequestPayload:
challenge = RendezvousChallenge.create()
payload = RendezvousRequestPayload(challenge)
return payload

def _create_rendezvous_response(self, challenge: RendezvousChallenge) -> RendezvousResponsePayload:
signature = challenge.sign(self.my_peer.key)
payload = RendezvousResponsePayload(challenge, RendezvousSignature(signature))
return payload

def _handle_rendezvous_response(self, peer, raw_payload: RawRendezvousResponsePayload):
signature, _ = self.serializer.unpack_serializable(RendezvousSignature, raw_payload.signature)
challenge, _ = self.serializer.unpack_serializable(RendezvousChallenge, raw_payload.challenge)

expected_nonce = self.rendezvous_cache.get_rendezvous_challenge(peer) or EMPTY_PEER_CHALLENGE
if expected_nonce == EMPTY_PEER_CHALLENGE or expected_nonce != challenge.nonce:
self.logger.warning(f"Received invalid rendezvous response from {peer.mid}")
return

if not self.crypto.is_valid_signature(peer.key, raw_payload.challenge, signature.signature):
self.logger.warning(f"Received invalid signature from {peer.mid}")
return

# This nonce has been burned.
self.rendezvous_cache.clear_peer_challenge(peer)

self.logger.debug(f"Received valid rendezvous response from {peer.mid}")
with db_session(immediate=True):
certificate = self.rdb.Certificate.get(public_key=peer.mid)
if not certificate:
certificate = self.rdb.Certificate(public_key=peer.mid, counter=0)
certificate.counter += 1
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
from tribler.core.components.metadata_store.db.store import MetadataStore
from tribler.core.components.metadata_store.remote_query_community.settings import RemoteQueryCommunitySettings
from tribler.core.components.popularity.community.popularity_community import PopularityCommunity
from tribler.core.components.popularity.rendezvous.db.database import RendezvousDatabase
from tribler.core.components.popularity.rendezvous.rendezvous import RendezvousChallenge
from tribler.core.components.torrent_checker.torrent_checker.torrentchecker_session import HealthInfo
from tribler.core.tests.tools.base_test import MockObject
from tribler.core.utilities.path_util import Path
Expand Down Expand Up @@ -210,3 +212,102 @@
await self.init_first_node_and_gossip(
HealthInfo(infohash, seeders=200, leechers=0))
self.nodes[1].overlay.send_remote_select.assert_not_called()


class TestRendezvousLogic(TestBase):
NUM_NODES = 3

def setUp(self):
super().setUp()
self.count = 0
self.initialize(PopularityCommunity, self.NUM_NODES)

def create_node(self, *args, **kwargs):
rdb = RendezvousDatabase(Path(self.temporary_directory()) / f"{self.count + 1}")
torrent_checker = MockObject()
torrent_checker.torrents_checked = {}
mds = MetadataStore(Path(self.temporary_directory()) / f"{self.count}",
Path(self.temporary_directory()),
default_eccrypto.generate_key("curve25519"))
rqc_settings = RemoteQueryCommunitySettings()

self.count += 1
return MockIPv8("curve25519", PopularityCommunity, metadata_store=mds,
rendezvous_db=rdb,
torrent_checker=torrent_checker,
rqc_settings=rqc_settings
)

async def test_introduction_rendezvous_payload(self):
await self.introduce_nodes()
await self.deliver_messages()

self.nodes[0].overlay.send_introduction_request(self.nodes[1].my_peer)
await self.deliver_messages()

self.nodes[0].overlay.send_introduction_request(self.nodes[2].my_peer)
await self.deliver_messages()

with db_session:
assert self.nodes[0].overlay.rdb.Certificate.get(public_key=self.nodes[1].my_peer.mid).counter == 1
assert self.nodes[0].overlay.rdb.Certificate.get(public_key=self.nodes[2].my_peer.mid).counter == 1

# Check if the rendezvous cache is updated
rendezvous_peers = list(self.nodes[1].overlay.rendezvous_cache.get_rendezvous_peers())
assert rendezvous_peers[0] == self.nodes[0].my_peer
rendezvous_peers = list(self.nodes[2].overlay.rendezvous_cache.get_rendezvous_peers())
assert rendezvous_peers[0] == self.nodes[0].my_peer

async def test_rendezvous_payloads(self):
await self.introduce_nodes()
await self.deliver_messages()

self.nodes[0].overlay.send_introduction_request(self.nodes[1].my_peer)
self.nodes[0].overlay.send_introduction_request(self.nodes[2].my_peer)
await self.deliver_messages()

number_of_rendezvous = 4
for _ in range(number_of_rendezvous):
for j in range(self.count):
self.nodes[j].overlay.ping_rendezvous()
await self.deliver_messages()

with db_session:
# Peer 0 should have a counter of 1 more
assert self.nodes[0].overlay.rdb.Certificate.get(
public_key=self.nodes[1].my_peer.mid).counter == number_of_rendezvous + 1
assert self.nodes[1].overlay.rdb.Certificate.get(
public_key=self.nodes[0].my_peer.mid).counter == number_of_rendezvous
assert self.nodes[2].overlay.rdb.Certificate.get(
public_key=self.nodes[0].my_peer.mid).counter == number_of_rendezvous

async def test_invalid_nonce(self):
await self.introduce_nodes()
await self.deliver_messages()

self.nodes[0].overlay.rendezvous_cache.add_peer(self.nodes[1].my_peer, b'1' * 16)

payload = self.nodes[1].overlay._create_rendezvous_response(RendezvousChallenge(b'2' * 16))

Check warning on line 290 in src/tribler/core/components/popularity/community/tests/test_popularity_community.py

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

src/tribler/core/components/popularity/community/tests/test_popularity_community.py#L290

Access to a protected member _create_rendezvous_response of a client class
self.nodes[1].overlay.ez_send(self.nodes[0].my_peer, payload)
await self.deliver_messages()

with db_session:
assert self.nodes[0].overlay.rdb.Certificate.get(public_key=self.nodes[1].my_peer.mid) is None

async def test_invalid_signature(self):
await self.introduce_nodes()
await self.deliver_messages()

challenge_1 = RendezvousChallenge(b'1' * 16)
challenge_2 = RendezvousChallenge(b'2' * 16)

self.nodes[0].overlay.rendezvous_cache.add_peer(self.nodes[1].my_peer, challenge_1.nonce)

payload = self.nodes[1].overlay._create_rendezvous_response(challenge_2)

Check warning on line 306 in src/tribler/core/components/popularity/community/tests/test_popularity_community.py

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

src/tribler/core/components/popularity/community/tests/test_popularity_community.py#L306

Access to a protected member _create_rendezvous_response of a client class
payload.challenge = challenge_1

self.nodes[1].overlay.ez_send(self.nodes[0].my_peer, payload)
await self.deliver_messages()

with db_session:
assert self.nodes[0].overlay.rdb.Certificate.get(public_key=self.nodes[1].my_peer.mid) is None
12 changes: 11 additions & 1 deletion src/tribler/core/components/popularity/popularity_component.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,17 @@
from tribler.core.components.ipv8.ipv8_component import INFINITE, Ipv8Component
from tribler.core.components.metadata_store.metadata_store_component import MetadataStoreComponent
from tribler.core.components.popularity.community.popularity_community import PopularityCommunity
from tribler.core.components.popularity.rendezvous.db.database import RendezvousDatabase
from tribler.core.components.reporter.reporter_component import ReporterComponent
from tribler.core.components.torrent_checker.torrent_checker_component import TorrentCheckerComponent
from tribler.core.utilities.simpledefs import STATEDIR_DB_DIR


class PopularityComponent(Component):
community: PopularityCommunity = None
RENDEZVOUS_DB_NAME = 'rendezvous.db'

community: PopularityCommunity = None
rendezvous_db: RendezvousDatabase = None
_ipv8_component: Ipv8Component = None

async def run(self):
Expand All @@ -22,13 +26,17 @@ async def run(self):
metadata_store_component = await self.require_component(MetadataStoreComponent)
torrent_checker_component = await self.require_component(TorrentCheckerComponent)

self.rendezvous_db = RendezvousDatabase(
db_path=self.session.config.state_dir / STATEDIR_DB_DIR / self.RENDEZVOUS_DB_NAME)

config = self.session.config
community = PopularityCommunity(self._ipv8_component.peer,
self._ipv8_component.ipv8.endpoint,
Network(),
settings=config.popularity_community,
rqc_settings=config.remote_query_community,
metadata_store=metadata_store_component.mds,
rendezvous_db=self.rendezvous_db,
torrent_checker=torrent_checker_component.torrent_checker)
self.community = community

Expand All @@ -39,3 +47,5 @@ async def shutdown(self):
await super().shutdown()
if self._ipv8_component and self.community:
await self._ipv8_component.unload_community(self.community)
if self.rendezvous_db:
self.rendezvous_db.shutdown()
Empty file.
Empty file.
36 changes: 36 additions & 0 deletions src/tribler/core/components/popularity/rendezvous/db/database.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from pathlib import Path
from typing import Union

from pony.orm import Database, db_session

from tribler.core.components.metadata_store.db.orm_bindings import misc
from tribler.core.components.popularity.rendezvous.db.orm_bindings import certificate
from tribler.core.utilities.utilities import MEMORY_DB


class RendezvousDatabase:
DB_VERSION = 0

def __init__(self, db_path: Union[Path, type(MEMORY_DB)]):

self.database = Database()

self.MiscData = misc.define_binding(self.database)
self.Certificate = certificate.define_binding(self.database)

if db_path is MEMORY_DB:
create_db = True
db_path_string = ":memory:"

Check warning on line 23 in src/tribler/core/components/popularity/rendezvous/db/database.py

View check run for this annotation

Codecov / codecov/patch

src/tribler/core/components/popularity/rendezvous/db/database.py#L22-L23

Added lines #L22 - L23 were not covered by tests
else:
create_db = not db_path.is_file()
db_path_string = str(db_path)

self.database.bind(provider='sqlite', filename=db_path_string, create_db=create_db, timeout=120.0)
self.database.generate_mapping(create_tables=create_db)

if create_db:
with db_session:
self.MiscData(name="db_version", value=str(self.DB_VERSION))

def shutdown(self) -> None:
self.database.disconnect()
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from pony.orm import Required, db_session

Check warning on line 1 in src/tribler/core/components/popularity/rendezvous/db/orm_bindings/certificate.py

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

src/tribler/core/components/popularity/rendezvous/db/orm_bindings/certificate.py#L1

Unused db_session imported from pony.orm


def define_binding(db):
class RendezvousCertificate(db.Entity):
public_key = Required(bytes, index=True)
counter = Required(int)

return RendezvousCertificate
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from pony.orm import Optional, PrimaryKey

Check warning on line 1 in src/tribler/core/components/popularity/rendezvous/db/orm_bindings/misc.py

View check run for this annotation

Codecov / codecov/patch

src/tribler/core/components/popularity/rendezvous/db/orm_bindings/misc.py#L1

Added line #L1 was not covered by tests


def define_binding(db):
class MiscData(db.Entity):
name = PrimaryKey(str)
value = Optional(str)

Check warning on line 7 in src/tribler/core/components/popularity/rendezvous/db/orm_bindings/misc.py

View check run for this annotation

Codecov / codecov/patch

src/tribler/core/components/popularity/rendezvous/db/orm_bindings/misc.py#L4-L7

Added lines #L4 - L7 were not covered by tests

return MiscData

Check warning on line 9 in src/tribler/core/components/popularity/rendezvous/db/orm_bindings/misc.py

View check run for this annotation

Codecov / codecov/patch

src/tribler/core/components/popularity/rendezvous/db/orm_bindings/misc.py#L9

Added line #L9 was not covered by tests
Loading
Loading