Skip to content

Commit

Permalink
Allow to keep stronger pokemon (#1302)
Browse files Browse the repository at this point in the history
* Allow to keep stronger pokemon. It is woring both with CP and IV

* Remove not needed code

* Add example of keep_best_iv into configuration

* Add delay before pokemon transfering

* Resolve merge conflicts

* Use config.release_pokemon to determine should we release pokemon or not

* Fix a bug
  • Loading branch information
Nihisil authored and solderzzc committed Jul 28, 2016
1 parent 4c46ad7 commit b7e133c
Show file tree
Hide file tree
Showing 3 changed files with 150 additions and 42 deletions.
6 changes: 5 additions & 1 deletion configs/config.json.example
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@
"release": {
"any": {"release_below_cp": 0, "release_below_iv": 0, "logic": "or"},
"// Example of always releasing Rattata:": {},
"// Rattata": { "always_release" : true }
"// Rattata": { "always_release" : true },
"// Example of keeping stronger (based on CP) Pidgey:": {},
"// Pidgey": { "keep_best_cp" : true },
"// Example of keeping stronger (based on IV) Zubat:": {},
"// Zubat": { "keep_best_iv" : true }
}
}
118 changes: 79 additions & 39 deletions pokemongo_bot/cell_workers/pokemon_catch_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from pokemon_transfer_worker import PokemonTransferWorker

class PokemonCatchWorker(object):

BAG_FULL = 'bag_full'
NO_POKEBALLS = 'no_pokeballs'

Expand All @@ -23,28 +24,26 @@ def __init__(self, pokemon, bot):
self.spawn_point_guid = ''
self.response_key = ''
self.response_status_key = ''
self.transfer_worker = None

def work(self):

encounter_id = self.pokemon['encounter_id']

response_dict = self.create_encounter_api_call()
self.transfer_worker = PokemonTransferWorker(self)

if response_dict and 'responses' in response_dict:
if self.response_key in response_dict['responses']:
if self.response_status_key in response_dict['responses'][self.response_key]:
if response_dict['responses'][self.response_key][self.response_status_key] is 7:
if self.config.release_pokemon:
logger.log('Pokemon Bag is full!', 'red')
worker = PokemonTransferWorker(self)
worker.work()
self.transfer_worker.work()

else:
raise RuntimeError('Pokemon Bag is full!')

if response_dict['responses'][self.response_key][self.response_status_key] is 1:
cp = 0
total_IV = 0
if 'wild_pokemon' in response_dict['responses'][self.response_key] or 'pokemon_data' in \
response_dict['responses'][self.response_key]:
if self.response_key == 'ENCOUNTER':
Expand All @@ -54,46 +53,40 @@ def work(self):

catch_rate = response_dict['responses'][self.response_key]['capture_probability'][
'capture_probability'] # 0 = pokeballs, 1 great balls, 3 ultra balls

pokemon_data = None
if 'pokemon_data' in pokemon and 'cp' in pokemon['pokemon_data']:
cp = pokemon['pokemon_data']['cp']
pokemon_data = pokemon['pokemon_data']
cp = pokemon_data['cp']

# make sure we catch any missing iv information
if 'individual_stamina' not in pokemon['pokemon_data']:
pokemon['pokemon_data']['individual_stamina'] = 0
if 'individual_attack' not in pokemon['pokemon_data']:
pokemon['pokemon_data']['individual_attack'] = 0
if 'individual_defense' not in pokemon['pokemon_data']:
pokemon['pokemon_data']['individual_defense'] = 0
if 'individual_stamina' not in pokemon_data:
pokemon_data['individual_stamina'] = 0
if 'individual_attack' not in pokemon_data:
pokemon_data['individual_attack'] = 0
if 'individual_defense' not in pokemon_data:
pokemon_data['individual_defense'] = 0

iv_stats = ['individual_attack', 'individual_defense', 'individual_stamina']
individual_attack = 0

individual_attack = pokemon['pokemon_data'].get("individual_attack", 0)
individual_stamina = pokemon['pokemon_data'].get("individual_stamina", 0)
individual_attack = pokemon_data.get("individual_attack", 0)
individual_stamina = pokemon_data.get("individual_stamina", 0)

iv_display = '{}/{}/{}'.format(
individual_stamina,
individual_attack,
pokemon['pokemon_data']['individual_defense']
pokemon_data['individual_defense']
)

for individual_stat in iv_stats:
try:
total_IV += pokemon['pokemon_data'][individual_stat]
except Exception:
pokemon['pokemon_data'][individual_stat] = 0
continue

pokemon_potential = round((total_IV / 45.0), 2)
pokemon_num = int(pokemon['pokemon_data'][
'pokemon_id']) - 1
pokemon_name = self.pokemon_list[
int(pokemon_num)]['Name']
pokemon_potential = self.pokemon_potential(pokemon_data)
pokemon_num = int(pokemon_data['pokemon_id']) - 1
pokemon_name = self.pokemon_list[int(pokemon_num)]['Name']
logger.log('A Wild {} appeared! [CP {}] [Potential {}]'.format(
pokemon_name, cp, pokemon_potential), 'yellow')

logger.log('IV [Stamina/Attack/Defense] = [{}]'.format(iv_display))
pokemon['pokemon_data']['name'] = pokemon_name
pokemon_data['name'] = pokemon_name
# Simulate app
sleep(3)

Expand Down Expand Up @@ -167,6 +160,8 @@ def work(self):
))

id_list1 = self.count_pokemon_inventory()
max_criteria_pokemon_list = self.get_max_criteria_pokemon_list()

self.api.catch_pokemon(encounter_id=encounter_id,
pokeball=pokeball,
normalized_reticle_size=1.950,
Expand Down Expand Up @@ -194,9 +189,10 @@ def work(self):

self.bot.metrics.captured_pokemon(pokemon_name, cp, iv_display, pokemon_potential)

logger.log('Captured {}! [CP {}] [{}]'.format(
logger.log('Captured {}! [CP {}] [Potential {}] [{}]'.format(
pokemon_name,
cp,
pokemon_potential,
iv_display
), 'blue')

Expand All @@ -219,34 +215,78 @@ def work(self):
logger.log(
'Failed to evolve {}!'.format(pokemon_name))

self.transfer_worker.check_stronger_pokemon(pokemon_name,
pokemon_data,
max_criteria_pokemon_list)

break
time.sleep(5)

def count_pokemon_inventory(self):
self.bot.latest_inventory = None # Need accurate count of balls/berries/pokemons
response_dict = self.bot.get_inventory()
# don't use cached bot.get_inventory() here
# because we need to have actual information in capture logic
self.api.get_inventory()
response_dict = self.api.call()

id_list = []
return self.counting_pokemon(response_dict, id_list)
callback = lambda pokemon: id_list.append(pokemon['id'])
self._foreach_pokemon_in_inventory(response_dict, callback)
return id_list

def counting_pokemon(self, response_dict, id_list):
def _foreach_pokemon_in_inventory(self, response_dict, callback):
try:
reduce(dict.__getitem__, [
"responses", "GET_INVENTORY", "inventory_delta", "inventory_items"], response_dict)
"responses", "GET_INVENTORY", "inventory_delta", "inventory_items"], response_dict)
except KeyError:
pass
else:
for item in response_dict['responses']['GET_INVENTORY']['inventory_delta']['inventory_items']:
try:
reduce(dict.__getitem__, [
"inventory_item_data", "pokemon_data"], item)
"inventory_item_data", "pokemon_data"], item)
except KeyError:
pass
else:
pokemon = item['inventory_item_data']['pokemon_data']
if pokemon.get('is_egg', False):
continue
id_list.append(pokemon['id'])
if not pokemon.get('is_egg', False):
callback(pokemon)

def get_max_criteria_pokemon_list(self):
"""
Takes a get_inventory request result and returns a list of pokemons and their data,
whereas the list contains no duplicate pokemons.
If there are duplicate pokemons in the inventory the strongest one (max CP) is returned.
"""
response_dict = self.bot.get_inventory()
pokemon_map = {}
callback = lambda pokemon: self._add_or_replace_pokemon(pokemon_map, pokemon)
self._foreach_pokemon_in_inventory(response_dict, callback)
return pokemon_map

def _add_or_replace_pokemon(self, pokemon_map, pokemon):
"""
Takes a map(pokemon_id, pokemon) and a pokemon and either adds the pokemon
if it's not yet in the map or if it's stronger than the one in the map.
"""
pokemon_id = pokemon['pokemon_id']
if pokemon_id in pokemon_map:
if self.transfer_worker.is_greater_by_criteria(pokemon, pokemon_map[pokemon_id]):
pokemon_map[pokemon_id] = pokemon
else:
pokemon_map[pokemon_id] = pokemon

return id_list
def pokemon_potential(self, pokemon_data):
total_iv = 0
iv_stats = ['individual_attack', 'individual_defense', 'individual_stamina']

for individual_stat in iv_stats:
try:
total_iv += pokemon_data[individual_stat]
except:
pokemon_data[individual_stat] = 0
continue

return round((total_iv / 45.0), 2)

def should_capture_pokemon(self, pokemon_name, cp, iv, response_dict):
catch_config = self._get_catch_config_for(pokemon_name)
Expand Down
68 changes: 66 additions & 2 deletions pokemongo_bot/cell_workers/pokemon_transfer_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@
from pokemongo_bot import logger

class PokemonTransferWorker(object):

def __init__(self, bot):
self.config = bot.config
self.pokemon_list = bot.pokemon_list
self.api = bot.api
self.bot = bot

def work(self):
if not self.config.release_pokemon:
Expand All @@ -30,8 +32,7 @@ def work(self):
if self.should_release_pokemon(pokemon_name, pokemon_cp, pokemon_potential):
logger.log('Exchanging {} for candy!'.format(
pokemon_name), 'green')
self.api.release_pokemon(pokemon_id=pokemon_data['id'])
response_dict = self.api.call()
self.release_pokemon(pokemon_data['id'])
action_delay(self.config.action_wait_min, self.config.action_wait_max)

def _release_pokemon_get_groups(self):
Expand Down Expand Up @@ -117,6 +118,69 @@ def should_release_pokemon(self, pokemon_name, cp, iv):

return logic_to_function[cp_iv_logic](*release_results.values())

def check_stronger_pokemon(self, pokemon_name, pokemon_data, max_criteria_pokemon_list):
if not self.config.release_pokemon:
return

release_config = self._get_release_config_for(pokemon_name)
if release_config.get('keep_best_cp', False) or release_config.get('keep_best_iv', False):
if release_config.get('keep_best_cp', False) and release_config.get('keep_best_iv', False):
logger.log("keep_best_cp and keep_best_iv can't be set true at the same time. Ignore this settings",
"red")
else:
pokemon_id = pokemon_data['pokemon_id']
display_pokemon = '{} [CP {}] [Potential {}]'.format(pokemon_name,
pokemon_data['cp'],
self.pokemon_potential(pokemon_data))
if pokemon_id in max_criteria_pokemon_list:
owned = max_criteria_pokemon_list[pokemon_id]
owned_display = '{} [CP {}] [Potential {}]'.format(pokemon_name,
owned['cp'],
self.pokemon_potential(owned))

better = self.is_greater_by_criteria(pokemon_data, owned)
if better:
logger.log('Owning weaker {}. Replacing it with {}!'.format(owned_display, display_pokemon), 'blue')
action_delay(self.config.action_wait_min, self.config.action_wait_max)
self.release_pokemon(pokemon_data['pokemon_id'])
logger.log('Weaker {} has been exchanged for candy!'.format(owned_display), 'blue')
return False
else:
logger.log('Owning better {} already!'.format(owned_display), 'blue')
return True
else:
logger.log('Not owning {}. Keeping it!'.format(display_pokemon), 'blue')
return False

def is_greater_by_criteria(self, pokemon, other_pokemon):
pokemon_num = int(pokemon['pokemon_id']) - 1
pokemon_name = self.pokemon_list[int(pokemon_num)]['Name']

release_config = self._get_release_config_for(pokemon_name)
if release_config.get('keep_best_cp', False):
return pokemon['cp'] > other_pokemon['cp']
elif release_config.get('keep_best_iv', False):
return self.pokemon_potential(pokemon) > self.pokemon_potential(other_pokemon)
else:
return False

def pokemon_potential(self, pokemon_data):
total_iv = 0
iv_stats = ['individual_attack', 'individual_defense', 'individual_stamina']

for individual_stat in iv_stats:
try:
total_iv += pokemon_data[individual_stat]
except:
pokemon_data[individual_stat] = 0
continue

return round((total_iv / 45.0), 2)

def release_pokemon(self, pokemon_id):
self.api.release_pokemon(pokemon_id=pokemon_id)
response_dict = self.api.call()

def _get_release_config_for(self, pokemon):
release_config = self.config.release.get(pokemon)
if not release_config:
Expand Down

0 comments on commit b7e133c

Please sign in to comment.