forked from PokemonGoF/PokemonGo-Bot
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Display stats in the terminal title (PokemonGoF#2252)
* Added UpdateTitleStats worker * Added UpdateTitleStats worker * Fixed return inconsistency in work method * 🚀 Massively improved pylint rate Cleaned ctypes unnecessary imports Moved initialization inside __init__ method * Fixed incorrect default value for min_interval * Added support for cygwin on Windows
- Loading branch information
Showing
3 changed files
with
365 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,232 @@ | ||
import ctypes | ||
from sys import stdout, platform as _platform | ||
from datetime import datetime, timedelta | ||
|
||
from pokemongo_bot.cell_workers.base_task import BaseTask | ||
from pokemongo_bot.worker_result import WorkerResult | ||
from pokemongo_bot.tree_config_builder import ConfigException | ||
|
||
|
||
class UpdateTitleStats(BaseTask): | ||
""" | ||
Periodically updates the terminal title to display stats about the bot. | ||
Fetching some stats requires making API calls. If you're concerned about the amount of calls | ||
your bot is making, don't enable this worker. | ||
Example config : | ||
{ | ||
"type": "UpdateTitleStats", | ||
"config": { | ||
"min_interval": 10, | ||
"stats": ["uptime", "km_walked", "level_stats", "xp_earned", "xp_per_hour"] | ||
} | ||
} | ||
Available stats : | ||
- uptime : The bot uptime. | ||
- km_walked : The kilometers walked since the bot started. | ||
- level : The current character's level. | ||
- level_completion : The current level experience, the next level experience and the completion | ||
percentage. | ||
- level_stats : Puts together the current character's level and its completion. | ||
- xp_per_hour : The estimated gain of experience per hour. | ||
- xp_earned : The experience earned since the bot started. | ||
- stops_visited : The number of visited stops. | ||
- pokemon_encountered : The number of encountered pokemon. | ||
- pokemon_caught : The number of caught pokemon. | ||
- pokemon_released : The number of released pokemon. | ||
- pokemon_evolved : The number of evolved pokemon. | ||
- pokemon_unseen : The number of pokemon never seen before. | ||
- pokemon_stats : Puts together the pokemon encountered, caught, released, evolved and unseen. | ||
- pokeballs_thrown : The number of thrown pokeballs. | ||
- stardust_earned : The number of earned stardust since the bot started. | ||
- highest_cp_pokemon : The caught pokemon with the highest CP since the bot started. | ||
- most_perfect_pokemon : The most perfect caught pokemon since the bot started. | ||
min_interval : The minimum interval at which the title is updated, | ||
in seconds (defaults to 10 seconds). | ||
The update interval cannot be accurate as workers run synchronously. | ||
stats : An array of stats to display and their display order (implicitly), | ||
see available stats above. | ||
""" | ||
|
||
DEFAULT_MIN_INTERVAL = 10 | ||
DEFAULT_DISPLAYED_STATS = [] | ||
|
||
def __init__(self, bot, config): | ||
""" | ||
Initializes the worker. | ||
:param bot: The bot instance. | ||
:type bot: PokemonGoBot | ||
:param config: The task configuration. | ||
:type config: dict | ||
""" | ||
super(UpdateTitleStats, self).__init__(bot, config) | ||
|
||
self.next_update = None | ||
self.min_interval = self.DEFAULT_MIN_INTERVAL | ||
self.displayed_stats = self.DEFAULT_DISPLAYED_STATS | ||
|
||
self._process_config() | ||
|
||
def initialize(self): | ||
pass | ||
|
||
def work(self): | ||
""" | ||
Updates the title if necessary. | ||
:return: Always returns WorkerResult.SUCCESS. | ||
:rtype: WorkerResult | ||
""" | ||
if not self._should_display(): | ||
return WorkerResult.SUCCESS | ||
title = self._get_stats_title(self._get_player_stats()) | ||
# If title is empty, it couldn't be generated. | ||
if not title: | ||
return WorkerResult.SUCCESS | ||
self._update_title(title, _platform) | ||
return WorkerResult.SUCCESS | ||
|
||
def _should_display(self): | ||
""" | ||
Returns a value indicating whether the title should be updated. | ||
:return: True if the title should be updated; otherwise, False. | ||
:rtype: bool | ||
""" | ||
return self.next_update is None or datetime.now() >= self.next_update | ||
|
||
def _update_title(self, title, platform): | ||
""" | ||
Updates the window title using different methods, according to the given platform | ||
:param title: The new window title. | ||
:type title: string | ||
:param platform: The platform string. | ||
:type platform: string | ||
:return: Nothing. | ||
:rtype: None | ||
:raise: RuntimeError: When the given platform isn't supported. | ||
""" | ||
if platform == "linux" or platform == "linux2"\ | ||
or platform == "darwin" or platform == "cygwin": | ||
stdout.write("\x1b]2;{}\x07".format(title)) | ||
elif platform == "win32": | ||
ctypes.windll.kernel32.SetConsoleTitleA(title) | ||
else: | ||
raise RuntimeError("unsupported platform '{}'".format(platform)) | ||
|
||
self.next_update = datetime.now() + timedelta(seconds=self.min_interval) | ||
|
||
def _process_config(self): | ||
""" | ||
Fetches the configuration for this worker and stores the values internally. | ||
:return: Nothing. | ||
:rtype: None | ||
""" | ||
self.min_interval = int(self.config.get('min_interval', self.DEFAULT_MIN_INTERVAL)) | ||
self.displayed_stats = self.config.get('stats', self.DEFAULT_DISPLAYED_STATS) | ||
|
||
def _get_stats_title(self, player_stats): | ||
""" | ||
Generates a stats string with the given player stats according to the configuration. | ||
:return: A string containing human-readable stats, ready to be displayed. | ||
:rtype: string | ||
""" | ||
# No player stats available, won't be able to gather all informations. | ||
if player_stats is None: | ||
return '' | ||
# No stats to display, avoid any useless overhead. | ||
if not self.displayed_stats: | ||
return '' | ||
|
||
# Gather stats values. | ||
metrics = self.bot.metrics | ||
metrics.capture_stats() | ||
runtime = metrics.runtime() | ||
distance_travelled = metrics.distance_travelled() | ||
current_level = int(player_stats.get('level', 0)) | ||
prev_level_xp = int(player_stats.get('prev_level_xp', 0)) | ||
next_level_xp = int(player_stats.get('next_level_xp', 0)) | ||
experience = int(player_stats.get('experience', 0)) | ||
current_level_xp = experience - prev_level_xp | ||
whole_level_xp = next_level_xp - prev_level_xp | ||
level_completion_percentage = int((current_level_xp * 100) / whole_level_xp) | ||
experience_per_hour = int(metrics.xp_per_hour()) | ||
xp_earned = metrics.xp_earned() | ||
stops_visited = metrics.visits['latest'] - metrics.visits['start'] | ||
pokemon_encountered = metrics.num_encounters() | ||
pokemon_caught = metrics.num_captures() | ||
pokemon_released = metrics.releases | ||
pokemon_evolved = metrics.num_evolutions() | ||
pokemon_unseen = metrics.num_new_mons() | ||
pokeballs_thrown = metrics.num_throws() | ||
stardust_earned = metrics.earned_dust() | ||
highest_cp_pokemon = metrics.highest_cp['desc'] | ||
if not highest_cp_pokemon: | ||
highest_cp_pokemon = "None" | ||
most_perfect_pokemon = metrics.most_perfect['desc'] | ||
if not most_perfect_pokemon: | ||
most_perfect_pokemon = "None" | ||
|
||
# Create stats strings. | ||
available_stats = { | ||
'uptime': 'Uptime : {}'.format(runtime), | ||
'km_walked': '{:,.2f}km walked'.format(distance_travelled), | ||
'level': 'Level {}'.format(current_level), | ||
'level_completion': '{:,} / {:,} XP ({}%)'.format(current_level_xp, whole_level_xp, | ||
level_completion_percentage), | ||
'level_stats': 'Level {} ({:,} / {:,}, {}%)'.format(current_level, current_level_xp, | ||
whole_level_xp, | ||
level_completion_percentage), | ||
'xp_per_hour': '{:,} XP/h'.format(experience_per_hour), | ||
'xp_earned': '+{:,} XP'.format(xp_earned), | ||
'stops_visited': 'Visited {:,} stops'.format(stops_visited), | ||
'pokemon_encountered': 'Encountered {:,} pokemon'.format(pokemon_encountered), | ||
'pokemon_caught': 'Caught {:,} pokemon'.format(pokemon_caught), | ||
'pokemon_released': 'Released {:,} pokemon'.format(pokemon_released), | ||
'pokemon_evolved': 'Evolved {:,} pokemon'.format(pokemon_evolved), | ||
'pokemon_unseen': 'Encountered {} new pokemon'.format(pokemon_unseen), | ||
'pokemon_stats': 'Encountered {:,} pokemon, {:,} caught, {:,} released, {:,} evolved, ' | ||
'{} never seen before'.format(pokemon_encountered, pokemon_caught, | ||
pokemon_released, pokemon_evolved, | ||
pokemon_unseen), | ||
'pokeballs_thrown': 'Threw {:,} pokeballs'.format(pokeballs_thrown), | ||
'stardust_earned': 'Earned {:,} Stardust'.format(stardust_earned), | ||
'highest_cp_pokemon': 'Highest CP pokemon : {}'.format(highest_cp_pokemon), | ||
'most_perfect_pokemon': 'Most perfect pokemon : {}'.format(most_perfect_pokemon), | ||
} | ||
|
||
def get_stat(stat): | ||
""" | ||
Fetches a stat string from the available stats dictionary. | ||
:param stat: The stat name. | ||
:type stat: string | ||
:return: The generated stat string. | ||
:rtype: string | ||
:raise: ConfigException: When the provided stat string isn't in the available stats | ||
dictionary. | ||
""" | ||
if stat not in available_stats: | ||
raise ConfigException("stat '{}' isn't available for displaying".format(stat)) | ||
return available_stats[stat] | ||
|
||
# Map stats the user wants to see to available stats and join them with pipes. | ||
title = ' | '.join(map(get_stat, self.displayed_stats)) | ||
|
||
return title | ||
|
||
def _get_player_stats(self): | ||
""" | ||
Helper method parsing the bot inventory object and returning the player stats object. | ||
:return: The player stats object. | ||
:rtype: dict | ||
""" | ||
inventory_items = self.bot.get_inventory() \ | ||
.get('responses', {}) \ | ||
.get('GET_INVENTORY', {}) \ | ||
.get('inventory_delta', {}) \ | ||
.get('inventory_items', {}) | ||
return next((x["inventory_item_data"]["player_stats"] | ||
for x in inventory_items | ||
if x.get("inventory_item_data", {}).get("player_stats", {})), | ||
None) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,131 @@ | ||
import unittest | ||
from datetime import datetime, timedelta | ||
from mock import patch, MagicMock | ||
from pokemongo_bot.cell_workers.update_title_stats import UpdateTitleStats | ||
from tests import FakeBot | ||
|
||
|
||
class UpdateTitleStatsTestCase(unittest.TestCase): | ||
config = { | ||
'min_interval': 20, | ||
'stats': ['pokemon_evolved', 'pokemon_encountered', 'uptime', 'pokemon_caught', | ||
'stops_visited', 'km_walked', 'level', 'stardust_earned', 'level_completion', | ||
'xp_per_hour', 'pokeballs_thrown', 'highest_cp_pokemon', 'level_stats', | ||
'xp_earned', 'pokemon_unseen', 'most_perfect_pokemon', 'pokemon_stats', | ||
'pokemon_released'] | ||
} | ||
player_stats = { | ||
'level': 25, | ||
'prev_level_xp': 1250000, | ||
'next_level_xp': 1400000, | ||
'experience': 1337500 | ||
} | ||
|
||
def setUp(self): | ||
self.bot = FakeBot() | ||
self.worker = UpdateTitleStats(self.bot, self.config) | ||
|
||
def mock_metrics(self): | ||
self.bot.metrics = MagicMock() | ||
self.bot.metrics.runtime.return_value = timedelta(hours=15, minutes=42, seconds=13) | ||
self.bot.metrics.distance_travelled.return_value = 42.05 | ||
self.bot.metrics.xp_per_hour.return_value = 1337.42 | ||
self.bot.metrics.xp_earned.return_value = 424242 | ||
self.bot.metrics.visits = {'latest': 250, 'start': 30} | ||
self.bot.metrics.num_encounters.return_value = 130 | ||
self.bot.metrics.num_captures.return_value = 120 | ||
self.bot.metrics.releases = 30 | ||
self.bot.metrics.num_evolutions.return_value = 12 | ||
self.bot.metrics.num_new_mons.return_value = 3 | ||
self.bot.metrics.num_throws.return_value = 145 | ||
self.bot.metrics.earned_dust.return_value = 24069 | ||
self.bot.metrics.highest_cp = {'desc': 'highest_cp'} | ||
self.bot.metrics.most_perfect = {'desc': 'most_perfect'} | ||
|
||
def test_process_config(self): | ||
self.assertEqual(self.worker.min_interval, self.config['min_interval']) | ||
self.assertEqual(self.worker.displayed_stats, self.config['stats']) | ||
|
||
def test_should_display_no_next_update(self): | ||
self.worker.next_update = None | ||
|
||
self.assertTrue(self.worker._should_display()) | ||
|
||
@patch('pokemongo_bot.cell_workers.update_title_stats.datetime') | ||
def test_should_display_before_next_update(self, mock_datetime): | ||
now = datetime.now() | ||
mock_datetime.now.return_value = now - timedelta(seconds=20) | ||
self.worker.next_update = now | ||
|
||
self.assertFalse(self.worker._should_display()) | ||
|
||
@patch('pokemongo_bot.cell_workers.update_title_stats.datetime') | ||
def test_should_display_after_next_update(self, mock_datetime): | ||
now = datetime.now() | ||
mock_datetime.now.return_value = now + timedelta(seconds=20) | ||
self.worker.next_update = now | ||
|
||
self.assertTrue(self.worker._should_display()) | ||
|
||
@patch('pokemongo_bot.cell_workers.update_title_stats.datetime') | ||
def test_should_display_exactly_next_update(self, mock_datetime): | ||
now = datetime.now() | ||
mock_datetime.now.return_value = now | ||
self.worker.next_update = now | ||
|
||
self.assertTrue(self.worker._should_display()) | ||
|
||
@patch('pokemongo_bot.cell_workers.update_title_stats.datetime') | ||
def test_next_update_after_update_title(self, mock_datetime): | ||
now = datetime.now() | ||
mock_datetime.now.return_value = now | ||
old_next_display_value = self.worker.next_update | ||
self.worker._update_title('', 'linux2') | ||
|
||
self.assertNotEqual(self.worker.next_update, old_next_display_value) | ||
self.assertEqual(self.worker.next_update, | ||
now + timedelta(seconds=self.config['min_interval'])) | ||
|
||
@patch('pokemongo_bot.cell_workers.update_title_stats.stdout') | ||
def test_update_title_linux_osx(self, mock_stdout): | ||
self.worker._update_title('', 'linux') | ||
|
||
self.assertEqual(mock_stdout.write.call_count, 1) | ||
|
||
self.worker._update_title('', 'linux2') | ||
|
||
self.assertEqual(mock_stdout.write.call_count, 2) | ||
|
||
self.worker._update_title('', 'darwin') | ||
|
||
self.assertEqual(mock_stdout.write.call_count, 3) | ||
|
||
@unittest.skip("Didn't find a way to mock ctypes.windll.kernel32.SetConsoleTitleA") | ||
def test_update_title_win32(self): | ||
self.worker._update_title('', 'win32') | ||
|
||
def test_get_stats_title_player_stats_none(self): | ||
title = self.worker._get_stats_title(None) | ||
|
||
self.assertEqual(title, '') | ||
|
||
def test_get_stats_no_displayed_stats(self): | ||
self.worker.displayed_stats = [] | ||
title = self.worker._get_stats_title(self.player_stats) | ||
|
||
self.assertEqual(title, '') | ||
|
||
def test_get_stats(self): | ||
self.mock_metrics() | ||
|
||
title = self.worker._get_stats_title(self.player_stats) | ||
expected = 'Evolved 12 pokemon | Encountered 130 pokemon | Uptime : 15:42:13 | ' \ | ||
'Caught 120 pokemon | Visited 220 stops | 42.05km walked | Level 25 | ' \ | ||
'Earned 24,069 Stardust | 87,500 / 150,000 XP (58%) | 1,337 XP/h | ' \ | ||
'Threw 145 pokeballs | Highest CP pokemon : highest_cp | ' \ | ||
'Level 25 (87,500 / 150,000, 58%) | +424,242 XP | ' \ | ||
'Encountered 3 new pokemon | Most perfect pokemon : most_perfect | ' \ | ||
'Encountered 130 pokemon, 120 caught, 30 released, 12 evolved, ' \ | ||
'3 never seen before | Released 30 pokemon' | ||
|
||
self.assertEqual(title, expected) |