-
Notifications
You must be signed in to change notification settings - Fork 262
/
value_function_agent.py
122 lines (93 loc) · 3.93 KB
/
value_function_agent.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
import random
import argparse
import numpy as np
from sklearn.neural_network import MLPRegressor
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
from sklearn.externals import joblib
import gym
from fn_framework import FNAgent, Trainer, Observer
class ValueFunctionAgent(FNAgent):
def save(self, model_path):
joblib.dump(self.model, model_path)
@classmethod
def load(cls, env, model_path, epsilon=0.0001):
actions = list(range(env.action_space.n))
agent = cls(epsilon, actions)
agent.model = joblib.load(model_path)
agent.initialized = True
return agent
def initialize(self, experiences):
scaler = StandardScaler()
estimator = MLPRegressor(hidden_layer_sizes=(10, 10), max_iter=1)
self.model = Pipeline([("scaler", scaler), ("estimator", estimator)])
states = np.vstack([e.s for e in experiences])
self.model.named_steps["scaler"].fit(states)
# Avoid the predict before fit.
self.update([experiences[0]], gamma=0)
self.initialized = True
print("Done initialization. From now, begin training!")
def estimate(self, s):
estimated = self.model.predict(s)[0]
return estimated
def _predict(self, states):
if self.initialized:
predicteds = self.model.predict(states)
else:
size = len(self.actions) * len(states)
predicteds = np.random.uniform(size=size)
predicteds = predicteds.reshape((-1, len(self.actions)))
return predicteds
def update(self, experiences, gamma):
states = np.vstack([e.s for e in experiences])
n_states = np.vstack([e.n_s for e in experiences])
estimateds = self._predict(states)
future = self._predict(n_states)
for i, e in enumerate(experiences):
reward = e.r
if not e.d:
reward += gamma * np.max(future[i])
estimateds[i][e.a] = reward
estimateds = np.array(estimateds)
states = self.model.named_steps["scaler"].transform(states)
self.model.named_steps["estimator"].partial_fit(states, estimateds)
class CartPoleObserver(Observer):
def transform(self, state):
return np.array(state).reshape((1, -1))
class ValueFunctionTrainer(Trainer):
def train(self, env, episode_count=220, epsilon=0.1, initial_count=-1,
render=False):
actions = list(range(env.action_space.n))
agent = ValueFunctionAgent(epsilon, actions)
self.train_loop(env, agent, episode_count, initial_count, render)
return agent
def begin_train(self, episode, agent):
agent.initialize(self.experiences)
def step(self, episode, step_count, agent, experience):
if self.training:
batch = random.sample(self.experiences, self.batch_size)
agent.update(batch, self.gamma)
def episode_end(self, episode, step_count, agent):
rewards = [e.r for e in self.get_recent(step_count)]
self.reward_log.append(sum(rewards))
if self.is_event(episode, self.report_interval):
recent_rewards = self.reward_log[-self.report_interval:]
self.logger.describe("reward", recent_rewards, episode=episode)
def main(play):
env = CartPoleObserver(gym.make("CartPole-v0"))
trainer = ValueFunctionTrainer()
path = trainer.logger.path_of("value_function_agent.pkl")
if play:
agent = ValueFunctionAgent.load(env, path)
agent.play(env)
else:
trained = trainer.train(env)
trainer.logger.plot("Rewards", trainer.reward_log,
trainer.report_interval)
trained.save(path)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="VF Agent")
parser.add_argument("--play", action="store_true",
help="play with trained model")
args = parser.parse_args()
main(args.play)