diff --git a/sc2/bot_ai_internal.py b/sc2/bot_ai_internal.py index 075f4ac7..96753404 100644 --- a/sc2/bot_ai_internal.py +++ b/sc2/bot_ai_internal.py @@ -21,6 +21,7 @@ ALL_GAS, IS_PLACEHOLDER, TERRAN_STRUCTURES_REQUIRE_SCV, + WORKER_TYPES, FakeEffectID, abilityid_to_unittypeid, geyser_ids, @@ -110,6 +111,7 @@ def _initialize_variables(self): self._enemy_units_previous_map: Dict[int, Unit] = {} self._enemy_structures_previous_map: Dict[int, Unit] = {} self._all_units_previous_map: Dict[int, Unit] = {} + self._unit_abilities: Dict[int, Set[AbilityId]] = {} self._previous_upgrades: Set[UpgradeId] = set() self._expansion_positions_list: List[Point2] = [] self._resource_location_to_expansion_position_dict: Dict[Point2, Point2] = {} @@ -469,7 +471,7 @@ def _prepare_first_step(self): self._time_before_step: float = time.perf_counter() @final - def _prepare_step(self, state, proto_game_info): + async def _prepare_step(self, state, proto_game_info): """ :param state: :param proto_game_info: @@ -505,6 +507,12 @@ def _prepare_step(self, state, proto_game_info): self.idle_worker_count: int = state.common.idle_worker_count self.army_count: int = state.common.army_count + + self._unit_abilities = await self.client.query_available_abilities_with_tag( + self.all_own_units, + ignore_resource_requirements=True, + ) + self._time_before_step: float = time.perf_counter() if self.enemy_race == Race.Random and self.all_enemy_units: @@ -534,8 +542,6 @@ def _prepare_units(self): self.techlab_tags: Set[int] = set() self.reactor_tags: Set[int] = set() - worker_types: Set[UnitTypeId] = {UnitTypeId.DRONE, UnitTypeId.DRONEBURROWED, UnitTypeId.SCV, UnitTypeId.PROBE} - index: int = 0 for unit in self.state.observation_raw.units: if unit.is_blip: @@ -596,7 +602,7 @@ def _prepare_units(self): self.reactor_tags.add(unit_obj.tag) else: self.units.append(unit_obj) - if unit_id in worker_types: + if unit_id in WORKER_TYPES: self.workers.append(unit_obj) elif unit_id == UnitTypeId.LARVA: self.larva.append(unit_obj) @@ -646,7 +652,7 @@ async def _advance_steps(self, steps: int): state = await self.client.observation() gs = GameState(state.observation) proto_game_info = await self.client._execute(game_info=sc_pb.RequestGameInfo()) - self._prepare_step(gs, proto_game_info) + await self._prepare_step(gs, proto_game_info) await self.issue_events() @final diff --git a/sc2/constants.py b/sc2/constants.py index 9769b7bc..fdc7dfa8 100644 --- a/sc2/constants.py +++ b/sc2/constants.py @@ -7,6 +7,8 @@ from sc2.ids.unit_typeid import UnitTypeId from sc2.ids.upgrade_id import UpgradeId +WORKER_TYPES: Set[UnitTypeId] = {UnitTypeId.DRONE, UnitTypeId.DRONEBURROWED, UnitTypeId.SCV, UnitTypeId.PROBE} + mineral_ids: Set[int] = { UnitTypeId.RICHMINERALFIELD.value, UnitTypeId.RICHMINERALFIELD750.value, diff --git a/sc2/main.py b/sc2/main.py index 5e1b3621..2ada294d 100644 --- a/sc2/main.py +++ b/sc2/main.py @@ -130,7 +130,7 @@ async def initialize_first_step() -> Optional[Result]: gs = GameState(state.observation) proto_game_info = await client._execute(game_info=sc_pb.RequestGameInfo()) try: - ai._prepare_step(gs, proto_game_info) + await ai._prepare_step(gs, proto_game_info) await ai.on_before_start() ai._prepare_first_step() await ai.on_start() @@ -191,7 +191,7 @@ async def run_bot_iteration(iteration: int): await ai.on_end(Result.Tie) return Result.Tie proto_game_info = await client._execute(game_info=sc_pb.RequestGameInfo()) - ai._prepare_step(gs, proto_game_info) + await ai._prepare_step(gs, proto_game_info) await run_bot_iteration(iteration) # Main bot loop @@ -252,7 +252,7 @@ async def _play_replay(client, ai, realtime=False, player_id=0): return client._game_result[player_id] gs = GameState(state.observation) proto_game_info = await client._execute(game_info=sc_pb.RequestGameInfo()) - ai._prepare_step(gs, proto_game_info) + await ai._prepare_step(gs, proto_game_info) ai._prepare_first_step() try: await ai.on_start() @@ -283,7 +283,7 @@ async def _play_replay(client, ai, realtime=False, player_id=0): logger.debug(f"Score: {gs.score.score}") proto_game_info = await client._execute(game_info=sc_pb.RequestGameInfo()) - ai._prepare_step(gs, proto_game_info) + await ai._prepare_step(gs, proto_game_info) logger.debug(f"Running AI step, it={iteration} {gs.game_loop * 0.725 * (1 / 16):.2f}s") diff --git a/sc2/unit.py b/sc2/unit.py index 01d65d28..8b4ce2aa 100644 --- a/sc2/unit.py +++ b/sc2/unit.py @@ -576,6 +576,34 @@ def target_in_range(self, target: Unit, bonus_distance: float = 0) -> bool: (self.radius + target.radius + unit_attack_range + bonus_distance)**2 ) + @cached_property + def abilities(self) -> Set[AbilityId]: + """Returns a set of all abilities the unit can execute. + + If the tech requirement is not met or the ability is on cooldown, the ability is not contained in this set. + Resource requirement is ignored and needs to be manually checked. + + Examples:: + + for stalker in self.units(UnitTypeId.STALKER): + # False if blink is on cooldown or not researched + if AbilityId.EFFECT_BLINK_STALKER in stalker.abilities: + stalker(EFFECT_BLINK_STALKER, target_position) + + for roach in self.units(UnitTypeId.ROACH): + if self.can_afford(UnitTypeId.RAVAGER): + # Automatically subtract 25 from self.minerals and 75 from self.vespene in this loop + roach.train(UnitTypeId.RAVAGER) + """ + if not self.is_mine: + warnings.warn( + f"Abilities are known only for your own units, but tried to get abilities for {self}.", + RuntimeWarning, + stacklevel=1, + ) + return set() + return self._bot_object._unit_abilities[self.tag] + def in_ability_cast_range( self, ability_id: AbilityId, target: Union[Unit, Point2], bonus_distance: float = 0 ) -> bool: @@ -1249,8 +1277,22 @@ def train( queue: bool = False, can_afford_check: bool = False, ) -> Union[UnitCommand, bool]: - """Orders unit to train another 'unit'. - Usage: COMMANDCENTER.train(SCV) + """Orders unit to train another 'unit'. Can also be used for unit and structure morphs. + Examples:: + + for cc in self.townhalls: + cc.train(SCV) + + for cc in self.townhalls(UnitTypeId.COMMANDCENTER): + # Check if we can afford it - does not check the tech requirement (in this case 'barracks') + if cc.is_idle and self.can_afford(UnitTypeId.ORBITALCOMMAND): + # Automatically subtract 150 from self.minerals in this loop + cc.train(UnitTypeId.ORBITALCOMMAND) + + for roach in self.units(UnitTypeId.ROACH): + if self.can_afford(UnitTypeId.RAVAGER): + # Automatically subtract 25 from self.minerals and 75 from self.vespene in this loop + roach.train(UnitTypeId.RAVAGER) :param unit: :param queue: @@ -1270,12 +1312,16 @@ def build( queue: bool = False, can_afford_check: bool = False, ) -> Union[UnitCommand, bool]: - """Orders unit to build another 'unit' at 'position'. - Usage:: + """Orders unit to build another 'unit' at 'position'. Can also be used for unit and structure morphs. + Examples:: - SCV.build(COMMANDCENTER, position) - # Target for refinery, assimilator and extractor needs to be the vespene geysir unit, not its position - SCV.build(REFINERY, target_vespene_geysir) + SCV.build(UnitTypeId.COMMANDCENTER, position) + + for cc in self.townhalls(UnitTypeId.COMMANDCENTER): + # Check if we can afford it - does not check the tech requirement (in this case 'barracks') + if cc.is_idle and self.can_afford(UnitTypeId.ORBITALCOMMAND): + # Automatically subtract 150 from self.minerals in this loop + cc.build(UnitTypeId.ORBITALCOMMAND) :param unit: :param position: diff --git a/test/autotest_bot.py b/test/autotest_bot.py index 7f176015..7b10e2c8 100644 --- a/test/autotest_bot.py +++ b/test/autotest_bot.py @@ -109,6 +109,39 @@ async def test_botai_properties(self): for location in self.enemy_start_locations: assert location in self.expansion_locations_list + # Test if units and structures have expected abilities + standard_scv_abilities = { + AbilityId.ATTACK_ATTACK, + AbilityId.EFFECT_REPAIR_SCV, + AbilityId.EFFECT_SPRAY_TERRAN, + AbilityId.HARVEST_GATHER_SCV, + AbilityId.HOLDPOSITION_HOLD, + AbilityId.MOVE_MOVE, + AbilityId.PATROL_PATROL, + AbilityId.SMART, + AbilityId.STOP_STOP, + AbilityId.TERRANBUILD_COMMANDCENTER, + AbilityId.TERRANBUILD_ENGINEERINGBAY, + AbilityId.TERRANBUILD_REFINERY, + AbilityId.TERRANBUILD_SUPPLYDEPOT, + } + for scv in self.units: + assert isinstance(scv.abilities, set) + if scv.is_carrying_minerals: + assert scv.abilities == standard_scv_abilities | {AbilityId.HARVEST_RETURN_SCV} + else: + assert scv.abilities == standard_scv_abilities + + for cc in self.townhalls: + assert isinstance(cc.abilities, set) + assert cc.abilities == { + AbilityId.COMMANDCENTERTRAIN_SCV, + AbilityId.LIFT_COMMANDCENTER, + AbilityId.LOADALL_COMMANDCENTER, + AbilityId.RALLY_COMMANDCENTER, + AbilityId.SMART, + } + self.tests_done_by_name.add("test_botai_properties") # Test BotAI functions @@ -438,8 +471,6 @@ async def test_botai_actions11(self): # Test if structures_without_construction_SCVs works after killing the scv async def test_botai_actions12(self): - map_center: Point2 = self.game_info.map_center - # Wait till can afford depot while not self.can_afford(UnitTypeId.SUPPLYDEPOT): await self.client.debug_all_resources() diff --git a/test/benchmark_bot_ai_init.py b/test/benchmark_bot_ai_init.py index 3a92e0c1..16a34805 100644 --- a/test/benchmark_bot_ai_init.py +++ b/test/benchmark_bot_ai_init.py @@ -1,13 +1,16 @@ from test.test_pickled_data import MAPS, build_bot_object_from_pickle_data, load_map_pickle_data from typing import Any, List, Tuple +import pytest -def _test_run_bot_ai_init_on_all_maps(pickle_data: List[Tuple[Any, Any, Any]]): + +async def _test_run_bot_ai_init_on_all_maps(pickle_data: List[Tuple[Any, Any, Any]]): for data in pickle_data: - build_bot_object_from_pickle_data(*data) + await build_bot_object_from_pickle_data(*data) -def test_bench_bot_ai_init(benchmark): +@pytest.mark.asyncio +async def test_bench_bot_ai_init(benchmark): # Load pickle files outside of benchmark map_pickle_data: List[Tuple[Any, Any, Any]] = [load_map_pickle_data(path) for path in MAPS] _result = benchmark(_test_run_bot_ai_init_on_all_maps, map_pickle_data) diff --git a/test/benchmark_prepare_units.py b/test/benchmark_prepare_units.py index a6229bed..7d0dedbf 100644 --- a/test/benchmark_prepare_units.py +++ b/test/benchmark_prepare_units.py @@ -1,16 +1,19 @@ from test.test_pickled_data import MAPS, get_map_specific_bot from typing import TYPE_CHECKING, List +import pytest + if TYPE_CHECKING: from sc2.bot_ai import BotAI -def _run_prepare_units(bot_objects: List["BotAI"]): +async def _run_prepare_units(bot_objects: List["BotAI"]): for bot_object in bot_objects: - bot_object._prepare_units() + await bot_object._prepare_units() -def test_bench_prepare_units(benchmark): +@pytest.mark.asyncio +async def test_bench_prepare_units(benchmark): bot_objects = [get_map_specific_bot(map_) for map_ in MAPS] _result = benchmark(_run_prepare_units, bot_objects) diff --git a/test/test_pickled_data.py b/test/test_pickled_data.py index de188e43..8ce2750b 100644 --- a/test/test_pickled_data.py +++ b/test/test_pickled_data.py @@ -16,7 +16,9 @@ from contextlib import suppress from pathlib import Path from typing import Any, List, Tuple +from unittest.mock import patch +import pytest from google.protobuf.internal import api_implementation from hypothesis import given, settings from hypothesis import strategies as st @@ -48,7 +50,7 @@ def load_map_pickle_data(map_path: Path) -> Tuple[Any, Any, Any]: return raw_game_data, raw_game_info, raw_observation -def build_bot_object_from_pickle_data(raw_game_data, raw_game_info, raw_observation) -> BotAI: +async def build_bot_object_from_pickle_data(raw_game_data, raw_game_info, raw_observation) -> BotAI: # Build fresh bot object, and load the pickled data into the bot object bot = BotAI() game_data = GameData(raw_game_data.data) @@ -57,14 +59,15 @@ def build_bot_object_from_pickle_data(raw_game_data, raw_game_info, raw_observat bot._initialize_variables() client = Client(True) bot._prepare_start(client=client, player_id=1, game_info=game_info, game_data=game_data) - bot._prepare_step(state=game_state, proto_game_info=raw_game_info) + with patch.object(Client, "query_available_abilities_with_tag", return_value={}): + await bot._prepare_step(state=game_state, proto_game_info=raw_game_info) return bot -def get_map_specific_bot(map_path: Path) -> BotAI: +async def get_map_specific_bot(map_path: Path) -> BotAI: assert map_path in MAPS data = load_map_pickle_data(map_path) - return build_bot_object_from_pickle_data(*data) + return await build_bot_object_from_pickle_data(*data) def test_protobuf_implementation(): @@ -74,8 +77,9 @@ def test_protobuf_implementation(): assert api_implementation.Type() == "cpp" -def test_bot_ai(): - bot: BotAI = get_map_specific_bot(random.choice(MAPS)) +@pytest.mark.asyncio +async def test_bot_ai(): + bot: BotAI = await get_map_specific_bot(random.choice(MAPS)) # Test initial bot attributes at game start # Properties from _prepare_start @@ -407,8 +411,9 @@ def assert_cost(item_id, real_cost: Cost): assert bot.calculate_supply_cost(UnitTypeId.LURKERMP) == 1 -def test_game_info(): - bot: BotAI = get_map_specific_bot(random.choice(MAPS)) +@pytest.mark.asyncio +async def test_game_info(): + bot: BotAI = await get_map_specific_bot(random.choice(MAPS)) # Test if main base ramp works bot.game_info.map_ramps, bot.game_info.vision_blockers = bot.game_info._find_ramps_and_vision_blockers() game_info: GameInfo = bot.game_info @@ -431,8 +436,9 @@ def test_game_info(): assert game_info.player_start_location -def test_game_data(): - bot: BotAI = get_map_specific_bot(random.choice(MAPS)) +@pytest.mark.asyncio +async def test_game_data(): + bot: BotAI = await get_map_specific_bot(random.choice(MAPS)) game_data = bot.game_data assert game_data.abilities @@ -472,8 +478,9 @@ def test_game_data(): assert isinstance(upgrade_data.cost, Cost) -def test_game_state(): - bot: BotAI = get_map_specific_bot(random.choice(MAPS)) +@pytest.mark.asyncio +async def test_game_state(): + bot: BotAI = await get_map_specific_bot(random.choice(MAPS)) state = bot.state assert not state.actions @@ -495,8 +502,9 @@ def test_game_state(): assert not state.effects -def test_pixelmap(): - bot: BotAI = get_map_specific_bot(random.choice(MAPS)) +@pytest.mark.asyncio +async def test_pixelmap(): + bot: BotAI = await get_map_specific_bot(random.choice(MAPS)) pathing_grid: PixelMap = bot.game_info.pathing_grid assert pathing_grid.bits_per_pixel assert pathing_grid.bytes_per_pixel == pathing_grid.bits_per_pixel // 8 @@ -510,20 +518,23 @@ def test_pixelmap(): pathing_grid.print() -def test_blip(): - bot: BotAI = get_map_specific_bot(random.choice(MAPS)) +@pytest.mark.asyncio +async def test_blip(): + bot: BotAI = await get_map_specific_bot(random.choice(MAPS)) # TODO this needs to be done in a test bot that has a sensor tower # blips are enemy dots on the minimap that are out of vision -def test_score(): - bot: BotAI = get_map_specific_bot(random.choice(MAPS)) +@pytest.mark.asyncio +async def test_score(): + bot: BotAI = await get_map_specific_bot(random.choice(MAPS)) assert bot.state.score assert bot.state.score.summary -def test_unit(): - bot: BotAI = get_map_specific_bot(random.choice(MAPS)) +@pytest.mark.asyncio +async def test_unit(): + bot: BotAI = await get_map_specific_bot(random.choice(MAPS)) scv: Unit = bot.workers.random townhall: Unit = bot.townhalls.first @@ -793,8 +804,9 @@ def test_unit(): # assert marauder1.calculate_damage_vs_target(marauder_15_hp, include_overkill_damage=False)[0] == 15 -def test_units(): - bot: BotAI = get_map_specific_bot(random.choice(MAPS)) +@pytest.mark.asyncio +async def test_units(): + bot: BotAI = await get_map_specific_bot(random.choice(MAPS)) scvs = bot.workers townhalls = bot.townhalls @@ -920,7 +932,8 @@ def test_units(): assert scvs.by_tag(scvs[0].tag) -def test_dicts(): +@pytest.mark.asyncio +async def test_dicts(): # May be missing but that should not fail the tests try: from sc2.dicts.unit_research_abilities import RESEARCH_INFO @@ -928,7 +941,7 @@ def test_dicts(): logger.info(f"Import error: dict sc2/dicts/unit_research_abilities.py is missing!") return - bot: BotAI = get_map_specific_bot(random.choice(MAPS)) + bot: BotAI = await get_map_specific_bot(random.choice(MAPS)) unit_id: UnitTypeId data: dict diff --git a/test/test_pickled_ramp.py b/test/test_pickled_ramp.py index 3c9f2b0a..b7dd7c5b 100644 --- a/test/test_pickled_ramp.py +++ b/test/test_pickled_ramp.py @@ -12,8 +12,10 @@ from pathlib import Path from test.test_pickled_data import MAPS, get_map_specific_bot +import pytest from loguru import logger +from sc2.bot_ai import BotAI from sc2.game_info import Ramp from sc2.position import Point2 from sc2.unit import Unit @@ -36,8 +38,9 @@ class TestClass: # Load all pickle files and convert them into bot objects from raw data (game_data, game_info, game_state) scenarios = [(map_path.name, {"map_path": map_path}) for map_path in MAPS] - def test_main_base_ramp(self, map_path: Path): - bot = get_map_specific_bot(map_path) + @pytest.mark.asyncio + async def test_main_base_ramp(self, map_path: Path): + bot: BotAI = await get_map_specific_bot(map_path) bot.game_info.map_ramps, bot.game_info.vision_blockers = bot.game_info._find_ramps_and_vision_blockers() # Test if main ramp works for all spawns @@ -85,8 +88,9 @@ def test_main_base_ramp(self, map_path: Path): assert ramp.protoss_wall_buildings == frozenset() assert ramp.protoss_wall_warpin is None - def test_bot_ai(self, map_path: Path): - bot = get_map_specific_bot(map_path) + @pytest.mark.asyncio + async def test_bot_ai(self, map_path: Path): + bot: BotAI = await get_map_specific_bot(map_path) # Recalculate and time expansion locations t0 = time.perf_counter()