-
Notifications
You must be signed in to change notification settings - Fork 0
/
hopper_reinforce.py
214 lines (163 loc) · 7.3 KB
/
hopper_reinforce.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
from __future__ import annotations
import random
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns
import torch
import torch.nn as nn
from torch.distributions.normal import Normal
import gymnasium as gym
from tqdm import tqdm
plt.rcParams["figure.figsize"] = (10, 5)
class Policy_Network(nn.Module):
"""Parametrized Policy Network."""
def __init__(self, obs_space_dims: int, action_space_dims: int):
"""Initializes a neural network that estimates the mean and standard deviation
of a normal distribution from which an action is sampled from.
Args:
obs_space_dims: Dimension of the observation space
action_space_dims: Dimension of the action space
"""
super().__init__()
hidden_space1 = 256 # Nothing special with 16, feel free to change
hidden_space2 = 512 # Nothing special with 32, feel free to change
# Shared Network
self.shared_net = nn.Sequential(
nn.Linear(obs_space_dims, hidden_space1),
nn.ReLU(),
nn.Linear(hidden_space1, hidden_space2),
nn.ReLU(),
)
# Policy Mean specific Linear Layer
self.policy_mean_net = nn.Sequential(
nn.Linear(hidden_space2, action_space_dims)
)
# Policy Std Dev specific Linear Layer
self.policy_stddev_net = nn.Sequential(
nn.Linear(hidden_space2, action_space_dims)
)
def forward(self, x: torch.Tensor) -> tuple[torch.Tensor, torch.Tensor]:
"""Conditioned on the observation, returns the mean and standard deviation
of a normal distribution from which an action is sampled from.
Args:
x: Observation from the environment
Returns:
action_means: predicted mean of the normal distribution
action_stddevs: predicted standard deviation of the normal distribution
"""
shared_features = self.shared_net(x.float())
action_means = self.policy_mean_net(shared_features)
action_stddevs = torch.log(
1 + torch.exp(self.policy_stddev_net(shared_features))
)
return action_means, action_stddevs
class REINFORCE:
"""REINFORCE algorithm."""
def __init__(self, obs_space_dims: int, action_space_dims: int):
"""Initializes an agent that learns a policy via REINFORCE algorithm [1]
to solve the task at hand (Inverted Pendulum v4).
Args:
obs_space_dims: Dimension of the observation space
action_space_dims: Dimension of the action space
"""
# Hyperparameters
self.learning_rate = 1e-4 # Learning rate for policy optimization
self.gamma = 0.95 # Discount factor
self.eps = 1e-6 # small number for mathematical stability
self.probs = [] # Stores probability values of the sampled action
self.rewards = [] # Stores the corresponding rewards
self.net = Policy_Network(obs_space_dims, action_space_dims)
self.optimizer = torch.optim.AdamW(self.net.parameters(), lr=self.learning_rate)
def sample_action(self, state: np.ndarray) -> float:
"""Returns an action, conditioned on the policy and observation.
Args:
state: Observation from the environment
Returns:
action: Action to be performed
"""
state = torch.tensor(np.array([state]))
action_means, action_stddevs = self.net(state)
# create a normal distribution from the predicted
# mean and standard deviation and sample an action
distrib = Normal(action_means[0] + self.eps, action_stddevs[0] + self.eps)
action = distrib.sample()
prob = distrib.log_prob(action)
action = action.numpy()
self.probs.append(prob)
return action
def update(self):
"""Updates the policy network's weights."""
running_g = 0
gs = []
# Discounted return (backwards) - [::-1] will return an array in reverse
for R in self.rewards[::-1]:
running_g = R + self.gamma * running_g
gs.insert(0, running_g)
deltas = torch.tensor(gs)
loss = 0
# minimize -1 * prob * reward obtained
for log_prob, delta in zip(self.probs, deltas):
loss += log_prob.mean() * delta * (-1)
# Update the policy network
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
# Empty / zero out all episode-centric/related variables
self.probs = []
self.rewards = []
if __name__ == "__main__":
# Create and wrap the environment
env = gym.make("Hopper-v4")
wrapped_env = gym.wrappers.RecordEpisodeStatistics(
env, 50
) # Records episode-reward
total_num_episodes = int(5e3) # Total number of episodes
# Observation-space of InvertedPendulum-v4 (4)
obs_space_dims = env.observation_space.shape[0]
# Action-space of InvertedPendulum-v4 (1)
action_space_dims = env.action_space.shape[0]
rewards_over_seeds = []
for seed in [3]: # [1, 2, 3, 5, 8]: # Fibonacci seeds
# set seed
torch.manual_seed(seed)
random.seed(seed)
np.random.seed(seed)
# Reinitialize agent every seed
agent = REINFORCE(obs_space_dims, action_space_dims)
reward_over_episodes = []
for episode in tqdm(range(total_num_episodes)):
# gymnasium v26 requires users to set seed while resetting the environment
obs, info = wrapped_env.reset(seed=seed)
done = False
while not done:
action = agent.sample_action(obs)
# Step return type - `tuple[ObsType, SupportsFloat, bool, bool, dict[str, Any]]`
# These represent the next observation, the reward from the step,
# if the episode is terminated, if the episode is truncated and
# additional info from the step
obs, reward, terminated, truncated, info = wrapped_env.step(action)
agent.rewards.append(reward)
# End the episode when either truncated or terminated is true
# - truncated: The episode duration reaches max number of timesteps
# - terminated: Any of the state space values is no longer finite.
done = terminated or truncated
reward_over_episodes.append(wrapped_env.return_queue[-1])
agent.update()
if episode % 1000 == 0:
avg_reward = int(np.mean(wrapped_env.return_queue))
print("Episode:", episode, "Average Reward:", avg_reward)
rewards_over_seeds.append(reward_over_episodes)
# Save the policy network weights
torch.save(agent.net.state_dict(), "policy_network_weights_hopper.pth")
print("Training finished. Weights saved to 'policy_network_weights.pth'.")
rewards_to_plot = [
[reward[0] for reward in rewards] for rewards in rewards_over_seeds
]
df1 = pd.DataFrame(rewards_to_plot).melt()
df1.rename(columns={"variable": "episodes", "value": "reward"}, inplace=True)
sns.set(style="darkgrid", context="talk", palette="rainbow")
sns.lineplot(x="episodes", y="reward", data=df1).set(
title="REINFORCE for InvertedPendulum-v4"
)
plt.show()