Skip to content

Commit

Permalink
Added popularity experiment
Browse files Browse the repository at this point in the history
  • Loading branch information
devos50 committed Mar 29, 2019
1 parent dd4facd commit 0aaf683
Show file tree
Hide file tree
Showing 10 changed files with 4,285 additions and 1 deletion.
3 changes: 3 additions & 0 deletions experiments/popularity/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
"""
This module contains experiments around the popularity community.
"""
25 changes: 25 additions & 0 deletions experiments/popularity/generate_torrent_info.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
"""
Generate a file that contains torrent information.
This file can then be loaded in an experiment to initialize channels.
"""
import random

from six.moves import xrange

PEERS = 20
TORRENTS_IN_CHANNEL = 200
DEAD_TORRENT_RATE = 0.619


def random_infohash():
""" Generates a random torrent infohash string """
return ''.join(random.choice('0123456789abcdef') for _ in range(40))


with open("torrents.txt", "w") as torrents_file:
for peer in xrange(PEERS):
for _ in xrange(TORRENTS_IN_CHANNEL):
infohash = random_infohash()
seeders = 0 if random.random() < DEAD_TORRENT_RATE else random.randint(1, 10000)
leechers = random.randint(0, 10000)
torrents_file.write("%d,%s,%d,%d\n" % (peer + 1, infohash, seeders, leechers))
44 changes: 44 additions & 0 deletions experiments/popularity/popularity_experiment.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
experiment_name = "popularity_experiment"

experiment_server_cmd = 'experiment_server.py'

local_setup_cmd = 'das4_setup.sh'

tracker_cmd = 'run_tracker.sh'
tracker_profile = 'true'

local_instance_cmd = 'das4_reserve_and_run.sh'

post_process_cmd = 'post_process_ipv8_experiment.sh'

#Run python in optimized mode?
PYTHONOPTIMIZE = yup
use_local_venv = TRUE

# The following options are used by das4_reserve_and_run.sh

# How many nodes do we want? (seconds)
das4_node_amount = 1

# Kill the processes if they don't die after this many seconds
das4_node_timeout = 120

# How many processes do we want to spawn?
das4_instances_to_run = 20

# What command do we want to run?
das4_node_command = "launch_scenario.py"
scenario_file = "popularity_experiment.scenario"

#messages_to_plot = 'ask,bid'

# The following options are used by the sync server

# Delay between sending the experiment info and the start signal
sync_experiment_start_delay = 1

sync_port = __unique_port__

tracker_port = __unique_port__

#extra_r_scripts_to_run = "plot_popularity.r"
23 changes: 23 additions & 0 deletions experiments/popularity/popularity_experiment.scenario
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
&module gumby.modules.tribler_module.TriblerModule
&module experiments.gigachannel.gigachannel_module.GigaChannelModule
&module experiments.popularity.popularity_module.PopularityModule

@0:0 annotate start-experiment
@0:1 isolate_ipv8_overlay GigaChannelCommunity
@0:1 isolate_ipv8_overlay PopularityCommunity
@0:2 start_session
@0:3 add_walking_strategy RandomWalk -1
@0:5 set_fake_dht_health_manager
@0:10 annotate create_channel
@0:10 create_channel
@0:15 insert_torrents_from_file torrents.txt
@0:45 annotate start_publish
@0:45 start_health_poll 5
@0:45 introduce_peers_popularity
@0:45 introduce_peers_gigachannels
@0:45 set_torrent_check_interval 5
@0:110 write_overlay_statistics
@0:110 write_torrent_health_statistics
@0:110 stop_health_poll
@0:115 stop_session
@0:120 stop
145 changes: 145 additions & 0 deletions experiments/popularity/popularity_module.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
import logging
import os
import random
from binascii import hexlify, unhexlify

from Tribler.community.popularity.community import PopularityCommunity
from Tribler.Core.Modules.MetadataStore.OrmBindings.channel_node import NEW
from Tribler.pyipv8.ipv8.taskmanager import TaskManager
from pony.orm import db_session, count
from twisted.internet import reactor
from twisted.internet.defer import Deferred
from twisted.internet.task import LoopingCall

from gumby.experiment import experiment_callback
from gumby.modules.community_experiment_module import IPv8OverlayExperimentModule
from gumby.modules.experiment_module import static_module


class FakeDHTHealthManager(TaskManager):
"""
This is a fake DHT health manager which gets its file information from a local source.
"""

def __init__(self):
TaskManager.__init__(self)
self._logger = logging.getLogger(self.__class__.__name__)
self.torrent_healths = {} # Dictionary from infohash -> (seeders, leechers)

def get_health(self, infohash, **_):
self._logger.info("Getting health info for infohash %s", hexlify(infohash))
deferred = Deferred()

seeders, peers = 0, 0
if infohash in self.torrent_healths:
seeders, peers = self.torrent_healths[infohash]

health_response = {
"DHT": [{
"infohash": hexlify(infohash),
"seeders": seeders,
"leechers": peers
}]
}

self.register_task("lookup_%s" % hexlify(infohash),
reactor.callLater(random.randint(1, 7), deferred.callback, health_response))
return deferred


@static_module
class PopularityModule(IPv8OverlayExperimentModule):
"""
This module contains code to manage experiments with the popularity community.
"""

def __init__(self, experiment):
super(PopularityModule, self).__init__(experiment, PopularityCommunity)
self._logger = logging.getLogger(self.__class__.__name__)
self.fake_dht_health_manager = None
self.health_poll_lc = LoopingCall(self.write_periodic_torrent_health_statistics)

def on_id_received(self):
super(PopularityModule, self).on_id_received()
self.tribler_config.set_popularity_community_enabled(True)
self.tribler_config.set_torrent_checking_enabled(True)

self.autoplot_create('num_healths', 'Number of torrent healths')

@experiment_callback
def start_health_poll(self, interval):
self.health_poll_lc.start(int(interval))

@experiment_callback
def stop_health_poll(self):
self.health_poll_lc.stop()

@experiment_callback
def set_fake_dht_health_manager(self):
self.fake_dht_health_manager = FakeDHTHealthManager()
self.session.lm.ltmgr.dht_health_manager = self.fake_dht_health_manager

@experiment_callback
def introduce_peers_popularity(self):
for peer_id in self.all_vars.iterkeys():
if int(peer_id) != self.my_id:
self.overlay.walk_to(self.experiment.get_peer_ip_port_by_id(peer_id))

@experiment_callback
def set_torrent_check_interval(self, interval):
interval = int(interval)
self._logger.info("Changing torrent check interval to %d seconds", interval)
torrent_checker = self.overlay.torrent_checker
torrent_checker.cancel_pending_task("torrent_check")
torrent_checker.torrent_check_lc = torrent_checker.register_task(
"torrent_check", LoopingCall(torrent_checker.check_random_torrent))
torrent_checker.torrent_check_lc.start(interval)

@experiment_callback
def insert_torrents_from_file(self, filename):
torrent_healths = {}
dir_path = os.path.dirname(os.path.realpath(__file__))
with db_session:
my_channel = self.overlay.metadata_store.ChannelMetadata.get_my_channel()
with open(os.path.join(dir_path, filename)) as torrents_file:
for line in torrents_file.readlines():
line = line.rstrip()
parts = line.split(",")
if len(parts) != 4:
continue

if int(parts[0]) == self.my_id:
# Add the torrent to you channel
new_entry_dict = {
"infohash": unhexlify(parts[1]),
"title": parts[1],
"size": 1024,
"status": NEW
}
_ = self.overlay.metadata_store.TorrentMetadata.from_dict(new_entry_dict)

# Update torrent health
torrent_healths[unhexlify(parts[1])] = (int(parts[2]), int(parts[3]))

my_channel.commit_channel_torrent()

if self.fake_dht_health_manager:
self.fake_dht_health_manager.torrent_healths.update(torrent_healths)

@experiment_callback
def write_periodic_torrent_health_statistics(self):
with db_session:
torrent_healths = count(state for state in self.overlay.metadata_store.TorrentState.select()
if state.last_check != 0)
self.autoplot_add_point('num_healths', torrent_healths)

@experiment_callback
def write_torrent_health_statistics(self):
with open("healths.txt", "w") as healths_file:
with db_session:
torrent_healths = self.overlay.metadata_store.TorrentState.select()[:]
for torrent_health in torrent_healths:
healths_file.write("%s,%d,%d,%d\n" % (hexlify(torrent_health.infohash),
torrent_health.seeders,
torrent_health.leechers,
torrent_health.last_check))
Loading

0 comments on commit 0aaf683

Please sign in to comment.