Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add typings to the Download class #6880

Merged
merged 3 commits into from
Apr 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 45 additions & 44 deletions src/tribler/core/components/libtorrent/download_manager/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import logging
from asyncio import CancelledError, Future, iscoroutine, sleep, wait_for
from collections import defaultdict
from typing import Optional
from typing import Any, Awaitable, Callable, Dict, List, Optional, Tuple

from ipv8.taskmanager import TaskManager, task
from ipv8.util import int2byte, succeed
Expand Down Expand Up @@ -48,7 +48,7 @@ def __init__(self,

self.dummy = dummy
self.tdef = tdef
self.handle = None
self.handle: Optional[lt.torrent_handle] = None
self.state_dir = state_dir
self.dlmgr = download_manager
self.download_defaults = download_defaults or DownloadDefaultsSettings()
Expand All @@ -58,7 +58,7 @@ def __init__(self,
self.hidden = False

# Libtorrent status
self.lt_status = None
self.lt_status: Optional[lt.torrent_status] = None
self.error = None
self.pause_after_next_hashcheck = False
self.checkpoint_after_next_hashcheck = False
Expand Down Expand Up @@ -109,7 +109,7 @@ def add_stream(self):
assert self.stream is None
self.stream = Stream(self)

def get_torrent_data(self):
def get_torrent_data(self) -> Optional[object]:
"""
Return torrent data, if the handle is valid and metadata is available.
"""
Expand All @@ -120,10 +120,11 @@ def get_torrent_data(self):
t = lt.create_torrent(torrent_info)
return t.generate()

def register_alert_handler(self, alert_type, handler):
def register_alert_handler(self, alert_type: str, handler: lt.torrent_handle):
self.alert_handlers[alert_type].append(handler)

def wait_for_alert(self, success_type, success_getter=None, fail_type=None, fail_getter=None):
def wait_for_alert(self, success_type: str, success_getter: Optional[Callable[[Any], Any]] = None,
fail_type: str = None, fail_getter: Optional[Callable[[Any], Any]] = None):
future = Future()
if success_type:
self.futures[success_type].append((future, future.set_result, success_getter))
Expand All @@ -135,10 +136,10 @@ async def wait_for_status(self, *status):
while self.get_state().get_status() not in status:
await self.wait_for_alert('state_changed_alert')

def get_def(self):
def get_def(self) -> TorrentDef:
return self.tdef

def get_handle(self):
def get_handle(self) -> Awaitable[lt.torrent_handle]:
"""
Returns a deferred that fires with a valid libtorrent download handle.
"""
Expand All @@ -147,13 +148,13 @@ def get_handle(self):

return self.wait_for_alert('add_torrent_alert', lambda a: a.handle)

def get_atp(self):
def get_atp(self) -> Dict:
save_path = self.config.get_dest_dir()
atp = {"save_path": str(save_path),
"storage_mode": lt.storage_mode_t.storage_mode_sparse,
"flags": lt.add_torrent_params_flags_t.flag_paused
| lt.add_torrent_params_flags_t.flag_duplicate_is_error
| lt.add_torrent_params_flags_t.flag_update_subscribe}
| lt.add_torrent_params_flags_t.flag_duplicate_is_error
| lt.add_torrent_params_flags_t.flag_update_subscribe}

if self.config.get_share_mode():
atp["flags"] = atp["flags"] | lt.add_torrent_params_flags_t.flag_share_mode
Expand All @@ -179,7 +180,7 @@ def get_atp(self):

return atp

def on_add_torrent_alert(self, alert):
def on_add_torrent_alert(self, alert: lt.add_torrent_alert):
self._logger.info(f'On add torrent alert: {alert}')

if hasattr(alert, 'error') and alert.error.value():
Expand Down Expand Up @@ -216,11 +217,11 @@ def on_add_torrent_alert(self, alert):

self.checkpoint()

def get_anon_mode(self):
def get_anon_mode(self) -> bool:
return self.config.get_hops() > 0

@check_handle(b'')
def get_pieces_base64(self):
def get_pieces_base64(self) -> bytes:
"""
Returns a base64 encoded bitmask of the pieces that we have.
"""
Expand All @@ -233,13 +234,13 @@ def get_pieces_base64(self):
encoded_str += int2byte(int(bitstr[i:i + 8].ljust(8, b'0'), 2))
return base64.b64encode(encoded_str)

def post_alert(self, alert_type, alert_dict=None):
def post_alert(self, alert_type: str, alert_dict: Optional[Dict] = None):
alert_dict = alert_dict or {}
alert_dict['category'] = lambda _: None
alert = type('anonymous_alert', (object,), alert_dict)()
return self.process_alert(alert, alert_type)
self.process_alert(alert, alert_type)

def process_alert(self, alert, alert_type):
def process_alert(self, alert: lt.torrent_alert, alert_type: str):
if alert.category() in [lt.alert.category_t.error_notification, lt.alert.category_t.performance_warning]:
self._logger.debug("Got alert: %s", alert)

Expand All @@ -250,10 +251,10 @@ def process_alert(self, alert, alert_type):
if not future.done():
future_setter(getter(alert) if getter else alert)

def on_torrent_error_alert(self, alert):
def on_torrent_error_alert(self, alert: lt.torrent_error_alert):
self._logger.error(f'On torrent error alert: {alert}')

def on_state_changed_alert(self, alert):
def on_state_changed_alert(self, alert: lt.state_changed_alert):
self._logger.info(f'On state changed alert: {alert}')

if not self.handle:
Expand All @@ -268,7 +269,7 @@ def on_state_changed_alert(self, alert):
if alert.state == lt.torrent_status.downloading and isinstance(self.tdef, TorrentDefNoMetainfo):
self.post_alert('metadata_received_alert')

def on_save_resume_data_alert(self, alert):
def on_save_resume_data_alert(self, alert: lt.save_resume_data_alert):
"""
Callback for the alert that contains the resume data of a specific download.
This resume data will be written to a file on disk.
Expand Down Expand Up @@ -299,12 +300,12 @@ def on_save_resume_data_alert(self, alert):
self.config.write(str(filename))
self._logger.debug('Saving download config to file %s', filename)

def on_tracker_reply_alert(self, alert):
def on_tracker_reply_alert(self, alert: lt.tracker_reply_alert):
self._logger.info(f'On tracker reply alert: {alert}')

self.tracker_status[alert.url] = [alert.num_peers, 'Working']

def on_tracker_error_alert(self, alert):
def on_tracker_error_alert(self, alert: lt.tracker_error_alert):
self._logger.error(f'On tracker error alert: {alert}')

# try-except block here is a workaround and has been added to solve
Expand All @@ -326,7 +327,7 @@ def on_tracker_error_alert(self, alert):
self._logger.exception(e)
return

def on_tracker_warning_alert(self, alert):
def on_tracker_warning_alert(self, alert: lt.tracker_warning_alert):
self._logger.warning(f'On tracker warning alert: {alert}')

peers = self.tracker_status[alert.url][0] if alert.url in self.tracker_status else 0
Expand All @@ -335,7 +336,7 @@ def on_tracker_warning_alert(self, alert):
self.tracker_status[alert.url] = [peers, status]

@check_handle()
def on_metadata_received_alert(self, alert):
def on_metadata_received_alert(self, alert: lt.metadata_received_alert):
self._logger.info(f'On metadata received alert: {alert}')

torrent_info = get_info_from_handle(self.handle)
Expand Down Expand Up @@ -364,7 +365,7 @@ def on_metadata_received_alert(self, alert):
self.set_selected_files()
self.checkpoint()

def on_performance_alert(self, alert):
def on_performance_alert(self, alert: lt.performance_alert):
self._logger.info(f'On performance alert: {alert}')

if self.get_anon_mode() or self.dlmgr.ltsessions is None:
Expand All @@ -387,13 +388,13 @@ def on_performance_alert(self, alert):
settings['max_queued_disk_bytes'] *= 2
self.dlmgr.set_session_settings(self.dlmgr.get_session(), settings)

def on_torrent_removed_alert(self, alert):
def on_torrent_removed_alert(self, alert: lt.torrent_removed_alert):
self._logger.info(f'On torrent remove alert: {alert}')

self._logger.debug("Removing %s", self.tdef.get_name())
self.handle = None

def on_torrent_checked_alert(self, alert):
def on_torrent_checked_alert(self, alert: lt.torrent_checked_alert):
self._logger.info(f'On torrent checked alert: {alert}')

if self.pause_after_next_hashcheck:
Expand All @@ -404,7 +405,7 @@ def on_torrent_checked_alert(self, alert):
self.checkpoint()

@check_handle()
def on_torrent_finished_alert(self, alert):
def on_torrent_finished_alert(self, alert: lt.torrent_finished_alert):
self._logger.info(f'On torrent finished alert: {alert}')
self.update_lt_status(self.handle.status())
self.checkpoint()
Expand All @@ -415,7 +416,7 @@ def on_torrent_finished_alert(self, alert):
hidden = self.hidden or self.config.get_channel_download()
self.notifier[notifications.torrent_finished](infohash=infohash, name=name, hidden=hidden)

def update_lt_status(self, lt_status):
def update_lt_status(self, lt_status: lt.torrent_status):
""" Update libtorrent stats and check if the download should be stopped."""
self.lt_status = lt_status
self._stop_if_finished()
Expand All @@ -432,7 +433,7 @@ def _stop_if_finished(self):
self.stop()

@check_handle()
def set_selected_files(self, selected_files=None, prio=4, force=False):
def set_selected_files(self, selected_files=None, prio: int = 4, force: bool = False):
if not force and self.stream is not None:
return
if not isinstance(self.tdef, TorrentDefNoMetainfo) and not self.get_share_mode():
Expand All @@ -453,7 +454,7 @@ def set_selected_files(self, selected_files=None, prio=4, force=False):
self.set_file_priorities(filepriorities)

@check_handle(False)
def move_storage(self, new_dir):
def move_storage(self, new_dir: Path):
if not isinstance(self.tdef, TorrentDefNoMetainfo):
self.handle.move_storage(str(new_dir))
self.config.set_dest_dir(new_dir)
Expand All @@ -474,7 +475,7 @@ def get_state(self):
return DownloadState(self, self.lt_status, self.error)

@task
async def save_resume_data(self, timeout=10):
async def save_resume_data(self, timeout: int = 10):
"""
Save the resume data of a download. This method returns when the resume data is available.
Note that this method only calls save_resume_data once on subsequent calls.
Expand All @@ -490,7 +491,7 @@ async def save_resume_data(self, timeout=10):
except (CancelledError, SaveResumeDataError, TimeoutError) as e:
self._logger.error("Resume data failed to save: %s", e)

def get_peerlist(self):
def get_peerlist(self) -> List[Dict[Any, Any]]:
""" Returns a list of dictionaries, one for each connected peer
containing the statistics for that peer. In particular, the
dictionary contains the keys:
Expand Down Expand Up @@ -552,7 +553,7 @@ def get_peerlist(self):
peers.append(peer_dict)
return peers

def get_num_connected_seeds_peers(self):
def get_num_connected_seeds_peers(self) -> Tuple[int, int]:
""" Returns number of connected seeders and leechers """
num_seeds = num_peers = 0
if not self.handle or not self.handle.is_valid():
Expand All @@ -566,7 +567,7 @@ def get_num_connected_seeds_peers(self):

return num_seeds, num_peers

def get_torrent(self):
def get_torrent(self) -> object:
if not self.handle or not self.handle.is_valid() or not self.handle.has_metadata():
return None

Expand Down Expand Up @@ -643,7 +644,7 @@ def resume(self):
self.handle.set_upload_mode(self.get_upload_mode())
self.handle.resume()

def get_content_dest(self):
def get_content_dest(self) -> Path:
""" Returns the file to which the downloaded content is saved. """
return self.config.get_dest_dir() / fix_filebasename(self.tdef.get_name_as_unicode())

Expand Down Expand Up @@ -678,17 +679,17 @@ def checkpoint(self):
return succeed(None)
return self.save_resume_data()

def set_def(self, tdef):
def set_def(self, tdef: TorrentDef):
self.tdef = tdef

@check_handle()
def add_trackers(self, trackers):
def add_trackers(self, trackers: List[str]):
if hasattr(self.handle, 'add_tracker'):
for tracker in trackers:
self.handle.add_tracker({'url': tracker, 'verified': False})

@check_handle()
def get_magnet_link(self):
def get_magnet_link(self) -> str:
return lt.make_magnet_uri(self.handle)

@require_handle
Expand All @@ -699,19 +700,19 @@ def add_peer(self, addr):
self.handle.connect_peer(addr, 0)

@require_handle
def set_priority(self, prio):
self.handle.set_priority(prio)
def set_priority(self, priority: int):
self.handle.set_priority(priority)

@require_handle
def set_max_upload_rate(self, value):
def set_max_upload_rate(self, value: int):
self.handle.set_upload_limit(value * 1024)

@require_handle
def set_max_download_rate(self, value):
def set_max_download_rate(self, value: int):
self.handle.set_download_limit(value * 1024)

@require_handle
def apply_ip_filter(self, enable):
def apply_ip_filter(self, enable: bool):
self.handle.apply_ip_filter(enable)

def get_share_mode(self):
Expand Down
15 changes: 10 additions & 5 deletions src/tribler/core/components/libtorrent/utils/torrent_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from asyncio import CancelledError, Future
from contextlib import suppress
from hashlib import sha1
from typing import Any, Dict, Iterable, List, Optional

from tribler.core.components.libtorrent.utils.libtorrent_helper import libtorrent as lt
from tribler.core.utilities.path_util import Path
Expand Down Expand Up @@ -48,15 +49,19 @@ def check_vod(default=None):
"""
Check if torrent is vod mode, else return default
"""

def wrap(f):
def invoke_func(self, *args, **kwargs):
if self.enabled:
return f(self, *args, **kwargs)
return default

return invoke_func

return wrap

def commonprefix(paths_list):

def common_prefix(paths_list: List[Path]) -> Path:
# this unlike the os path .commonprefix version always returns path prefixes as it compares
# path component wise.
base_set = set(paths_list[0].parents)
Expand All @@ -66,7 +71,7 @@ def commonprefix(paths_list):
return sorted(base_set, reverse=True)[0]


def _existing_files(path_list):
def _existing_files(path_list: List[Path]) -> Iterable[Path]:
for path in path_list:
path = Path(path)
if not path.exists():
Expand All @@ -75,7 +80,7 @@ def _existing_files(path_list):
yield path


def create_torrent_file(file_path_list, params, torrent_filepath=None):
def create_torrent_file(file_path_list: List[Path], params: Dict[bytes, Any], torrent_filepath: Optional[str] = None):
fs = lt.file_storage()

# filter all non-files
Expand All @@ -84,7 +89,7 @@ def create_torrent_file(file_path_list, params, torrent_filepath=None):
# ACHTUNG!
# In the case of a multi-file torrent, the torrent name plays the role of the toplevel dir name.
# get the directory where these files are in. If there are multiple files, take the common directory they are in
base_dir = (commonprefix(path_list).parent if len(path_list) > 1 else path_list[0].parent).absolute()
base_dir = (common_prefix(path_list).parent if len(path_list) > 1 else path_list[0].parent).absolute()
for path in path_list:
relative = path.relative_to(base_dir)
fs.add_file(str(relative), path.size())
Expand Down Expand Up @@ -151,7 +156,7 @@ def create_torrent_file(file_path_list, params, torrent_filepath=None):
}


def get_info_from_handle(handle):
def get_info_from_handle(handle: lt.torrent_handle) -> Optional[lt.torrent_info]:
# In libtorrent 0.16.18, the torrent_handle.torrent_file method is not available.
# this method checks whether the torrent_file method is available on a given handle.
# If not, fall back on the deprecated get_torrent_info
Expand Down
Loading