Skip to content

Commit

Permalink
Merge branch 'v0.3' into refine_rl_component_bundle
Browse files Browse the repository at this point in the history
  • Loading branch information
lihuoran committed Dec 27, 2022
2 parents 5d62151 + 9fd91ff commit f2a5170
Show file tree
Hide file tree
Showing 22 changed files with 221 additions and 158 deletions.
4 changes: 2 additions & 2 deletions examples/vm_scheduling/offline_lp/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from ilp_agent import IlpAgent

from maro.simulator import Env
from maro.simulator.scenarios.vm_scheduling import DecisionPayload
from maro.simulator.scenarios.vm_scheduling import DecisionEvent
from maro.simulator.scenarios.vm_scheduling.common import Action
from maro.utils import LogFormat, Logger, convert_dottable

Expand Down Expand Up @@ -46,7 +46,7 @@
env.set_seed(config.env.seed)

metrics: object = None
decision_event: DecisionPayload = None
decision_event: DecisionEvent = None
is_done: bool = False
action: Action = None

Expand Down
10 changes: 5 additions & 5 deletions examples/vm_scheduling/rl/env_sampler.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from maro.rl.policy import AbsPolicy
from maro.rl.rollout import AbsAgentWrapper, AbsEnvSampler, CacheElement, SimpleAgentWrapper
from maro.simulator import Env
from maro.simulator.scenarios.vm_scheduling import AllocateAction, DecisionPayload, PostponeAction
from maro.simulator.scenarios.vm_scheduling import AllocateAction, DecisionEvent, PostponeAction

from .config import (
num_features,
Expand Down Expand Up @@ -62,7 +62,7 @@ def __init__(

def _get_global_and_agent_state_impl(
self,
event: DecisionPayload,
event: DecisionEvent,
tick: int = None,
) -> Tuple[Union[None, np.ndarray, List[object]], Dict[Any, Union[np.ndarray, List[object]]]]:
pm_state, vm_state = self._get_pm_state(), self._get_vm_state(event)
Expand All @@ -89,14 +89,14 @@ def _get_global_and_agent_state_impl(
def _translate_to_env_action(
self,
action_dict: Dict[Any, Union[np.ndarray, List[object]]],
event: DecisionPayload,
event: DecisionEvent,
) -> Dict[Any, object]:
if action_dict["AGENT"] == self.num_pms:
return {"AGENT": PostponeAction(vm_id=event.vm_id, postpone_step=1)}
else:
return {"AGENT": AllocateAction(vm_id=event.vm_id, pm_id=action_dict["AGENT"][0])}

def _get_reward(self, env_action_dict: Dict[Any, object], event: DecisionPayload, tick: int) -> Dict[Any, float]:
def _get_reward(self, env_action_dict: Dict[Any, object], event: DecisionEvent, tick: int) -> Dict[Any, float]:
action = env_action_dict["AGENT"]
conf = reward_shaping_conf if self._env == self._learn_env else test_reward_shaping_conf
if isinstance(action, PostponeAction): # postponement
Expand Down Expand Up @@ -139,7 +139,7 @@ def _get_vm_state(self, event):
],
)

def _get_allocation_reward(self, event: DecisionPayload, alpha: float, beta: float):
def _get_allocation_reward(self, event: DecisionEvent, alpha: float, beta: float):
vm_unit_price = self._env.business_engine._get_unit_price(
event.vm_cpu_cores_requirement,
event.vm_memory_requirement,
Expand Down
4 changes: 2 additions & 2 deletions examples/vm_scheduling/rule_based_algorithm/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@
# Licensed under the MIT license.

from maro.simulator import Env
from maro.simulator.scenarios.vm_scheduling import AllocateAction, DecisionPayload, PostponeAction
from maro.simulator.scenarios.vm_scheduling import AllocateAction, DecisionEvent, PostponeAction
from maro.simulator.scenarios.vm_scheduling.common import Action


class VMSchedulingAgent(object):
def __init__(self, algorithm):
self._algorithm = algorithm

def choose_action(self, decision_event: DecisionPayload, env: Env) -> Action:
def choose_action(self, decision_event: DecisionEvent, env: Env) -> Action:
"""This method will determine whether to postpone the current VM or allocate a PM to the current VM."""
valid_pm_num: int = len(decision_event.valid_pms)

Expand Down
4 changes: 2 additions & 2 deletions examples/vm_scheduling/rule_based_algorithm/best_fit.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@
from rule_based_algorithm import RuleBasedAlgorithm

from maro.simulator import Env
from maro.simulator.scenarios.vm_scheduling import AllocateAction, DecisionPayload
from maro.simulator.scenarios.vm_scheduling import AllocateAction, DecisionEvent


class BestFit(RuleBasedAlgorithm):
def __init__(self, **kwargs):
super().__init__()
self._metric_type: str = kwargs["metric_type"]

def allocate_vm(self, decision_event: DecisionPayload, env: Env) -> AllocateAction:
def allocate_vm(self, decision_event: DecisionEvent, env: Env) -> AllocateAction:
# Use a rule to choose a valid PM.
chosen_idx: int = self._pick_pm_func(decision_event, env)
# Take action to allocate on the chose PM.
Expand Down
4 changes: 2 additions & 2 deletions examples/vm_scheduling/rule_based_algorithm/bin_packing.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from rule_based_algorithm import RuleBasedAlgorithm

from maro.simulator import Env
from maro.simulator.scenarios.vm_scheduling import AllocateAction, DecisionPayload
from maro.simulator.scenarios.vm_scheduling import AllocateAction, DecisionEvent


class BinPacking(RuleBasedAlgorithm):
Expand All @@ -24,7 +24,7 @@ def _init_bin(self):
self._bins = [[] for _ in range(self._pm_cpu_core_num + 1)]
self._bin_size = [0] * (self._pm_cpu_core_num + 1)

def allocate_vm(self, decision_event: DecisionPayload, env: Env) -> AllocateAction:
def allocate_vm(self, decision_event: DecisionEvent, env: Env) -> AllocateAction:
# Initialize the bin.
self._init_bin()

Expand Down
4 changes: 2 additions & 2 deletions examples/vm_scheduling/rule_based_algorithm/first_fit.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@
from rule_based_algorithm import RuleBasedAlgorithm

from maro.simulator import Env
from maro.simulator.scenarios.vm_scheduling import AllocateAction, DecisionPayload
from maro.simulator.scenarios.vm_scheduling import AllocateAction, DecisionEvent


class FirstFit(RuleBasedAlgorithm):
def __init__(self, **kwargs):
super().__init__()

def allocate_vm(self, decision_event: DecisionPayload, env: Env) -> AllocateAction:
def allocate_vm(self, decision_event: DecisionEvent, env: Env) -> AllocateAction:
# Use a valid PM based on its order.
chosen_idx: int = decision_event.valid_pms[0]
# Take action to allocate on the chose PM.
Expand Down
4 changes: 2 additions & 2 deletions examples/vm_scheduling/rule_based_algorithm/random_pick.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@
from rule_based_algorithm import RuleBasedAlgorithm

from maro.simulator import Env
from maro.simulator.scenarios.vm_scheduling import AllocateAction, DecisionPayload
from maro.simulator.scenarios.vm_scheduling import AllocateAction, DecisionEvent


class RandomPick(RuleBasedAlgorithm):
def __init__(self, **kwargs):
super().__init__()

def allocate_vm(self, decision_event: DecisionPayload, env: Env) -> AllocateAction:
def allocate_vm(self, decision_event: DecisionEvent, env: Env) -> AllocateAction:
valid_pm_num: int = len(decision_event.valid_pms)
# Random choose a valid PM.
chosen_idx: int = random.randint(0, valid_pm_num - 1)
Expand Down
4 changes: 2 additions & 2 deletions examples/vm_scheduling/rule_based_algorithm/round_robin.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from rule_based_algorithm import RuleBasedAlgorithm

from maro.simulator import Env
from maro.simulator.scenarios.vm_scheduling import AllocateAction, DecisionPayload
from maro.simulator.scenarios.vm_scheduling import AllocateAction, DecisionEvent


class RoundRobin(RuleBasedAlgorithm):
Expand All @@ -15,7 +15,7 @@ def __init__(self, **kwargs):
kwargs["env"].snapshot_list["pms"][kwargs["env"].frame_index :: ["cpu_cores_capacity"]].shape[0]
)

def allocate_vm(self, decision_event: DecisionPayload, env: Env) -> AllocateAction:
def allocate_vm(self, decision_event: DecisionEvent, env: Env) -> AllocateAction:
# Choose the valid PM which index is next to the previous chose PM's index
chosen_idx: int = (self._prev_idx + 1) % self._pm_num
while chosen_idx not in decision_event.valid_pms:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@
import abc

from maro.simulator import Env
from maro.simulator.scenarios.vm_scheduling import AllocateAction, DecisionPayload
from maro.simulator.scenarios.vm_scheduling import AllocateAction, DecisionEvent


class RuleBasedAlgorithm(object):
__metaclass__ = abc.ABCMeta

@abc.abstractmethod
def allocate_vm(self, decision_event: DecisionPayload, env: Env) -> AllocateAction:
def allocate_vm(self, decision_event: DecisionEvent, env: Env) -> AllocateAction:
"""This method will determine allocate which PM to the current VM."""
raise NotImplementedError
21 changes: 21 additions & 0 deletions maro/common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.


class BaseDecisionEvent:
"""Base class for all decision events.
We made this design for the convenience of users. As a summary, there are two types of events in MARO:
- CascadeEvent & AtomEvent: used to drive the MARO Env / business engine.
- DecisionEvent: exposed to users as a means of communication.
The latter one serves as the `payload` of the former ones inside of MARO Env.
Therefore, the related namings might be a little bit tricky.
- Inside MARO Env: `decision_event` is actually a CascadeEvent. DecisionEvent is the payload of them.
- Outside MARO Env (for users): `decision_event` is a DecisionEvent.
"""


class BaseAction:
"""Base class for all action payloads"""
24 changes: 12 additions & 12 deletions maro/event_buffer/event_buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@

import csv
from collections import defaultdict
from typing import Callable, List, Optional
from typing import Callable, List, Optional, cast

from ..common import BaseAction, BaseDecisionEvent
from .event import ActualEvent, AtomEvent, CascadeEvent
from .event_linked_list import EventLinkedList
from .event_pool import EventPool
Expand Down Expand Up @@ -122,9 +123,7 @@ def gen_atom_event(self, tick: int, event_type: object, payload: object = None)
Returns:
AtomEvent: Atom event object
"""
event = self._event_pool.gen(tick, event_type, payload, False)
assert isinstance(event, AtomEvent)
return event
return cast(AtomEvent, self._event_pool.gen(tick, event_type, payload, is_cascade=False))

def gen_cascade_event(self, tick: int, event_type: object, payload: object) -> CascadeEvent:
"""Generate an cascade event that used to hold immediate events that
Expand All @@ -138,31 +137,32 @@ def gen_cascade_event(self, tick: int, event_type: object, payload: object) -> C
Returns:
CascadeEvent: Cascade event object.
"""
event = self._event_pool.gen(tick, event_type, payload, True)
assert isinstance(event, CascadeEvent)
return event
return cast(CascadeEvent, self._event_pool.gen(tick, event_type, payload, is_cascade=True))

def gen_decision_event(self, tick: int, payload: object) -> CascadeEvent:
def gen_decision_event(self, tick: int, payload: BaseDecisionEvent) -> CascadeEvent:
"""Generate a decision event that will stop current simulation, and ask agent for action.
Args:
tick (int): Tick that the event will be processed.
payload (object): Payload of event, used to pass data to handlers.
payload (BaseDecisionEvent): Payload of event, used to pass data to handlers.
Returns:
CascadeEvent: Event object
"""
assert isinstance(payload, BaseDecisionEvent)
return self.gen_cascade_event(tick, MaroEvents.PENDING_DECISION, payload)

def gen_action_event(self, tick: int, payload: object) -> CascadeEvent:
def gen_action_event(self, tick: int, payloads: List[BaseAction]) -> CascadeEvent:
"""Generate an event that used to dispatch action to business engine.
Args:
tick (int): Tick that the event will be processed.
payload (object): Payload of event, used to pass data to handlers.
payloads (List[BaseAction]): Payloads of event, used to pass data to handlers.
Returns:
CascadeEvent: Event object
"""
return self.gen_cascade_event(tick, MaroEvents.TAKE_ACTION, payload)
assert isinstance(payloads, list)
assert all(isinstance(p, BaseAction) for p in payloads)
return self.gen_cascade_event(tick, MaroEvents.TAKE_ACTION, payloads)

def register_event_handler(self, event_type: object, handler: Callable) -> None:
"""Register an event with handler, when there is an event need to be processed,
Expand Down
Loading

0 comments on commit f2a5170

Please sign in to comment.