Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rl v3 hanging issue fix #455

Merged
merged 18 commits into from
Jan 14, 2022
27 changes: 0 additions & 27 deletions docker-compose.yml

This file was deleted.

4 changes: 2 additions & 2 deletions docker_files/dev.df
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM ubuntu:18.04
FROM python:3.7.12-buster
WORKDIR /maro

# Install Apt packages
Expand All @@ -9,7 +9,7 @@ RUN apt-get install -y gcc
RUN apt-get install -y libcurl4 libcurl4-openssl-dev libssl-dev curl
RUN apt-get install -y libzmq3-dev
RUN apt-get install -y python3-pip
RUN apt-get install -y python3-dev libpython3.6-dev python-numpy
RUN apt-get install -y python3-dev libpython3.7-dev python-numpy
RUN rm -rf /var/lib/apt/lists/*

# Install Python packages
Expand Down
4 changes: 2 additions & 2 deletions examples/rl/cim/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@

from .callbacks import post_collect, post_evaluate
from .env_sampler import agent2policy, get_env_sampler
from .policies_v2 import policy_func_dict
from .policies import policy_creator, trainer_creator

__all__ = ["agent2policy", "post_collect", "post_evaluate", "get_env_sampler", "policy_func_dict"]
__all__ = ["agent2policy", "post_collect", "post_evaluate", "get_env_sampler", "policy_creator", "trainer_creator"]
7 changes: 0 additions & 7 deletions examples/rl/cim/callbacks.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,6 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.

import time
from os import makedirs
from os.path import dirname, join, realpath

log_dir = join(dirname(realpath(__file__)), "log", str(time.time()))
makedirs(log_dir, exist_ok=True)


def post_collect(trackers, ep, segment):
# print the env metric from each rollout worker
Expand Down
88 changes: 0 additions & 88 deletions examples/rl/cim/config.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,6 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.

import torch
from torch.optim import Adam, RMSprop

from maro.rl.exploration import MultiLinearExplorationScheduler, epsilon_greedy


env_conf = {
"scenario": "cim",
"topology": "toy.4p_ssdd_l0.0",
Expand Down Expand Up @@ -40,86 +34,4 @@
+ len(vessel_attributes)
)

############################################## POLICIES ###############################################

algorithm = "ac"

# DQN settings
q_net_conf = {
"input_dim": state_dim,
"hidden_dims": [256, 128, 64, 32],
"output_dim": len(action_shaping_conf["action_space"]),
"activation": torch.nn.LeakyReLU,
"softmax": False,
"batch_norm": True,
"skip_connection": False,
"head": True,
"dropout_p": 0.0
}

q_net_optim_conf = (RMSprop, {"lr": 0.05})

dqn_conf = {
"reward_discount": .0,
"update_target_every": 5,
"num_epochs": 10,
"soft_update_coef": 0.1,
"double": False,
"exploration_strategy": (epsilon_greedy, {"epsilon": 0.4}),
"exploration_scheduling_options": [(
"epsilon", MultiLinearExplorationScheduler, {
"splits": [(2, 0.32)],
"initial_value": 0.4,
"last_ep": 5,
"final_value": 0.0,
}
)],
"replay_memory_capacity": 10000,
"random_overwrite": False,
"warmup": 100,
"rollout_batch_size": 128,
"train_batch_size": 32,
# "prioritized_replay_kwargs": {
# "alpha": 0.6,
# "beta": 0.4,
# "beta_step": 0.001,
# "max_priority": 1e8
# }
}


# AC settings
actor_net_conf = {
"input_dim": state_dim,
"hidden_dims": [256, 128, 64],
"output_dim": len(action_shaping_conf["action_space"]),
"activation": torch.nn.Tanh,
"softmax": True,
"batch_norm": False,
"head": True
}

critic_net_conf = {
"input_dim": state_dim,
"hidden_dims": [256, 128, 64],
"output_dim": 1,
"activation": torch.nn.LeakyReLU,
"softmax": False,
"batch_norm": True,
"head": True
}

actor_optim_conf = (Adam, {"lr": 0.001})
critic_optim_conf = (RMSprop, {"lr": 0.001})

ac_conf = {
"reward_discount": .0,
"grad_iters": 10,
"critic_loss_cls": torch.nn.SmoothL1Loss,
"min_logp": None,
"critic_loss_coef": 0.1,
"entropy_coef": 0.01,
# "clip_ratio": 0.8 # for PPO
"lam": .0,
"get_loss_on_rollout": False
}
104 changes: 37 additions & 67 deletions examples/rl/cim/env_sampler.py
Original file line number Diff line number Diff line change
@@ -1,89 +1,66 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.

import os
import sys
from typing import Any, Dict, Optional, Tuple

import numpy as np

from maro.rl.learning import AbsEnvSampler
from maro.rl_v3.learning import AbsEnvSampler, CacheElement, SimpleAgentWrapper
from maro.simulator import Env
from maro.simulator.scenarios.cim.common import Action, ActionType

cim_path = os.path.dirname(os.path.realpath(__file__))
if cim_path not in sys.path:
sys.path.insert(0, cim_path)
from maro.simulator.scenarios.cim.common import Action, ActionType, DecisionEvent

from config import (
from .config import (
action_shaping_conf, algorithm, env_conf, port_attributes, reward_shaping_conf, state_shaping_conf,
vessel_attributes
)
from policies_v2 import policy_func_dict
from .policies import policy_creator


class CIMEnvSampler(AbsEnvSampler):
def get_state(self, event, tick=None):
"""
The state vector includes shortage and remaining vessel space over the past k days (where k is the "look_back"
value in ``state_shaping_conf``), as well as all downstream port features.
"""
tick = self.env.tick
vessel_snapshots, port_snapshots = self.env.snapshot_list["vessels"], self.env.snapshot_list["ports"]
def _get_global_and_agent_state(
self, event: DecisionEvent, tick: int = None
) -> Tuple[Optional[np.ndarray], Dict[Any, np.ndarray]]:
tick = self._env.tick
vessel_snapshots, port_snapshots = self._env.snapshot_list["vessels"], self._env.snapshot_list["ports"]
port_idx, vessel_idx = event.port_idx, event.vessel_idx
ticks = [max(0, tick - rt) for rt in range(state_shaping_conf["look_back"] - 1)]
future_port_list = vessel_snapshots[tick: vessel_idx: 'future_stop_list'].astype('int')
state = np.concatenate([
port_snapshots[ticks : [port_idx] + list(future_port_list) : port_attributes],
vessel_snapshots[tick : vessel_idx : vessel_attributes]
port_snapshots[ticks: [port_idx] + list(future_port_list): port_attributes],
vessel_snapshots[tick: vessel_idx: vessel_attributes]
])
return {port_idx: state}

def get_env_action(self, action_by_agent, event):
"""
The policy output is an integer from [0, 20] which is to be interpreted as the index of ``action_space`` in
``action_shaping_conf``. For example, action 5 corresponds to -0.5, which means loading 50% of the containers
available at the current port to the vessel, while action 18 corresponds to 0.8, which means loading 80% of the
containers on the vessel to the port. Note that action 10 corresponds 0.0, which means doing nothing.
"""
return None, {port_idx: state}

def _translate_to_env_action(self, action_dict: Dict[Any, np.ndarray], event: DecisionEvent) -> Dict[Any, object]:
action_space = action_shaping_conf["action_space"]
finite_vsl_space = action_shaping_conf["finite_vessel_space"]
has_early_discharge = action_shaping_conf["has_early_discharge"]

port_idx, action = list(action_by_agent.items()).pop()
port_idx, model_action = list(action_dict.items()).pop()

vsl_idx, action_scope = event.vessel_idx, event.action_scope
vsl_snapshots = self.env.snapshot_list["vessels"]
vsl_space = vsl_snapshots[self.env.tick:vsl_idx:vessel_attributes][2] if finite_vsl_space else float("inf")
vsl_snapshots = self._env.snapshot_list["vessels"]
vsl_space = vsl_snapshots[self._env.tick:vsl_idx:vessel_attributes][2] if finite_vsl_space else float("inf")

model_action = action["action"] if isinstance(action, dict) else action
percent = abs(action_space[model_action])
percent = abs(action_space[model_action[0]])
zero_action_idx = len(action_space) / 2 # index corresponding to value zero.
if model_action < zero_action_idx:
action_type = ActionType.LOAD
actual_action = min(round(percent * action_scope.load), vsl_space)
elif model_action > zero_action_idx:
action_type = ActionType.DISCHARGE
early_discharge = vsl_snapshots[self.env.tick:vsl_idx:"early_discharge"][0] if has_early_discharge else 0
early_discharge = vsl_snapshots[self._env.tick:vsl_idx:"early_discharge"][0] if has_early_discharge else 0
plan_action = percent * (action_scope.discharge + early_discharge) - early_discharge
actual_action = round(plan_action) if plan_action > 0 else round(percent * action_scope.discharge)
else:
actual_action, action_type = 0, None

return {port_idx: Action(vsl_idx, port_idx, actual_action, action_type)}
return {port_idx: Action(vsl_idx, int(port_idx), actual_action, action_type)}

def get_reward(self, env_action, tick):
"""
The reward is defined as a linear combination of fulfillment and shortage measures. The fulfillment and
shortage measures are the sums of fulfillment and shortage values over the next k days, respectively, each
adjusted with exponential decay factors (using the "time_decay" value in ``reward_shaping_conf``) to put more
emphasis on the near future. Here k is the "time_window" value in ``reward_shaping_conf``. The linear
combination coefficients are given by "fulfillment_factor" and "shortage_factor" in ``reward_shaping_conf``.
"""
def _get_reward(self, env_action_dict: Dict[Any, object], tick: int) -> Dict[Any, float]:
start_tick = tick + 1
ticks = list(range(start_tick, start_tick + reward_shaping_conf["time_window"]))

# Get the ports that took actions at the given tick
ports = list(env_action.keys())
port_snapshots = self.env.snapshot_list["ports"]
ports = [int(port) for port in list(env_action_dict.keys())]
port_snapshots = self._env.snapshot_list["ports"]
future_fulfillment = port_snapshots[ticks:ports:"fulfillment"].reshape(len(ticks), -1)
future_shortage = port_snapshots[ticks:ports:"shortage"].reshape(len(ticks), -1)

Expand All @@ -94,22 +71,15 @@ def get_reward(self, env_action, tick):
)
return {agent_id: reward for agent_id, reward in zip(ports, rewards)}

def post_step(self, state, action, env_action, reward, tick):
"""
The environment sampler contains a "tracker" dict inherited from the "AbsEnvSampler" base class, which can
be used to record any information one wishes to keep track of during a roll-out episode. Here we simply record
the latest env metric without keeping the history for logging purposes.
"""
self.tracker["env_metric"] = self.env.metrics


agent2policy = {agent: f"{algorithm}.{agent}" for agent in Env(**env_conf).agent_idx_list}

def get_env_sampler():
return CIMEnvSampler(
get_env=lambda: Env(**env_conf),
get_policy_func_dict=policy_func_dict,
agent2policy=agent2policy,
reward_eval_delay=reward_shaping_conf["time_window"],
parallel_inference=False
)
def _post_step(self, cache_element: CacheElement, reward: Dict[Any, float]) -> None:
self._tracker["env_metric"] = self._env.metrics


agent2policy = {agent: f"{algorithm}_{agent}.{agent}" for agent in Env(**env_conf).agent_idx_list}
get_env_sampler=lambda: CIMEnvSampler(
get_env_func=lambda: Env(**env_conf),
policy_creator=policy_creator,
agent2policy=agent2policy,
agent_wrapper_cls=SimpleAgentWrapper,
device="cpu"
)
File renamed without changes.
Loading