Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add NoisyObs. #23

Merged
merged 9 commits into from
Jun 30, 2020
Merged
6 changes: 6 additions & 0 deletions src/seals/diagnostics/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,9 @@
entry_point="seals.diagnostics.risky_path:RiskyPathEnv",
max_episode_steps=5,
)

gym.register(
id="seals/NoisyObs-v0",
entry_point="seals.diagnostics.noisy_obs:NoisyObsEnv",
max_episode_steps=15,
)
65 changes: 65 additions & 0 deletions src/seals/diagnostics/noisy_obs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
"""Environment testing for robustness to noise."""

from gym import spaces
import numpy as np

from seals import base_envs, util


class NoisyObsEnv(base_envs.ResettablePOMDP):
"""Simple gridworld with noisy observations.

The agent randomly starts at the one of the corners of an MxM grid and
tries to reach and stay at the center. The observation consists of the
agent's (x,y) coordinates and L "distractor" samples of Gaussian noise .
The challenge is to select the relevant features in the observations, and
not overfit to noise.
"""

def __init__(self, *, size: int = 5, noise_length: int = 20):
"""Build environment.

Args:
size: width and height of gridworld.
noise_length: dimension of noise vector in observation.
"""
self._size = size
self._noise_length = noise_length
self._goal = np.array([self._size // 2, self._size // 2])

super().__init__(
state_space=spaces.MultiDiscrete([size, size]),
action_space=spaces.Discrete(5),
observation_space=spaces.Box(
low=np.concatenate(([0, 0], np.full(self._noise_length, -np.inf))),
high=np.concatenate(
([size - 1, size - 1], np.full(self._noise_length, np.inf)),
),
dtype=np.float32,
),
)

def terminal(self, state: np.ndarray, n_actions_taken: int) -> bool:
"""Always returns False."""
return False

def initial_state(self) -> np.ndarray:
"""Returns one of the grid's corners."""
n = self._size
corners = np.array([[0, 0], [n - 1, 0], [0, n - 1], [n - 1, n - 1]])
return corners[self.rand_state.randint(4)]

def reward(self, state: np.ndarray, action: int, new_state: np.ndarray) -> float:
"""Returns +1.0 reward if state is the goal and 0.0 otherwise."""
return float(np.all(state == self._goal))

def transition(self, state: np.ndarray, action: int) -> np.ndarray:
"""Returns next state according to grid."""
return util.grid_transition_fn(
state, action, x_bounds=(0, self._size - 1), y_bounds=(0, self._size - 1),
)

def obs_from_state(self, state: np.ndarray) -> np.ndarray:
"""Returns (x, y) concatenated with Gaussian noise."""
noise_vector = self.rand_state.randn(self._noise_length)
return np.concatenate([state, noise_vector])
40 changes: 39 additions & 1 deletion src/seals/util.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Miscellaneous utilities."""

from typing import Optional
from typing import Optional, Tuple

import gym
import numpy as np
Expand Down Expand Up @@ -118,3 +118,41 @@ def sample_distribution(
def one_hot_encoding(pos: int, size: int) -> np.ndarray:
"""Returns a 1-D hot encoding of a given position and size."""
return np.eye(size)[pos]


def grid_transition_fn(
state: np.ndarray,
action: int,
x_bounds: Tuple[float, float] = (-np.inf, np.inf),
y_bounds: Tuple[float, float] = (-np.inf, np.inf),
):
"""Returns transition of a deterministic gridworld.

Agent is bounded in the region limited by x_bounds and y_bounds,
ends inclusive.

pedrofreire marked this conversation as resolved.
Show resolved Hide resolved
(0, 0) is interpreted to be top-left corner.

Actions:
0: Right
1: Down
2: Left
3: Up
4: Stay put
"""
dirs = [
(1, 0),
(0, 1),
(-1, 0),
(0, -1),
(0, 0),
]

x, y = state
dx, dy = dirs[action]

next_x = np.clip(x + dx, *x_bounds)
next_y = np.clip(y + dy, *y_bounds)
next_state = np.array([next_x, next_y], dtype=state.dtype)

return next_state