-
Notifications
You must be signed in to change notification settings - Fork 262
/
planner.py
151 lines (123 loc) · 4.77 KB
/
planner.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
class Planner():
def __init__(self, env):
self.env = env
self.log = []
def initialize(self):
self.env.reset()
self.log = []
def plan(self, gamma=0.9, threshold=0.0001):
raise Exception("Planner have to implements plan method.")
def transitions_at(self, state, action):
transition_probs = self.env.transit_func(state, action)
for next_state in transition_probs:
prob = transition_probs[next_state]
reward, _ = self.env.reward_func(next_state)
yield prob, next_state, reward
def dict_to_grid(self, state_reward_dict):
grid = []
for i in range(self.env.row_length):
row = [0] * self.env.column_length
grid.append(row)
for s in state_reward_dict:
grid[s.row][s.column] = state_reward_dict[s]
return grid
class ValueIterationPlanner(Planner):
def __init__(self, env):
super().__init__(env)
def plan(self, gamma=0.9, threshold=0.0001):
self.initialize()
actions = self.env.actions
V = {}
for s in self.env.states:
# Initialize each state's expected reward.
V[s] = 0
while True:
delta = 0
self.log.append(self.dict_to_grid(V))
for s in V:
if not self.env.can_action_at(s):
continue
expected_rewards = []
for a in actions:
r = 0
for prob, next_state, reward in self.transitions_at(s, a):
r += prob * (reward + gamma * V[next_state])
expected_rewards.append(r)
max_reward = max(expected_rewards)
delta = max(delta, abs(max_reward - V[s]))
V[s] = max_reward
if delta < threshold:
break
V_grid = self.dict_to_grid(V)
return V_grid
class PolicyIterationPlanner(Planner):
def __init__(self, env):
super().__init__(env)
self.policy = {}
def initialize(self):
super().initialize()
self.policy = {}
actions = self.env.actions
states = self.env.states
for s in states:
self.policy[s] = {}
for a in actions:
# Initialize policy.
# At first, each action is taken uniformly.
self.policy[s][a] = 1 / len(actions)
def estimate_by_policy(self, gamma, threshold):
V = {}
for s in self.env.states:
# Initialize each state's expected reward.
V[s] = 0
while True:
delta = 0
for s in V:
expected_rewards = []
for a in self.policy[s]:
action_prob = self.policy[s][a]
r = 0
for prob, next_state, reward in self.transitions_at(s, a):
r += action_prob * prob * \
(reward + gamma * V[next_state])
expected_rewards.append(r)
value = sum(expected_rewards)
delta = max(delta, abs(value - V[s]))
V[s] = value
if delta < threshold:
break
return V
def plan(self, gamma=0.9, threshold=0.0001):
self.initialize()
states = self.env.states
actions = self.env.actions
def take_max_action(action_value_dict):
return max(action_value_dict, key=action_value_dict.get)
while True:
update_stable = True
# Estimate expected rewards under current policy.
V = self.estimate_by_policy(gamma, threshold)
self.log.append(self.dict_to_grid(V))
for s in states:
# Get an action following to the current policy.
policy_action = take_max_action(self.policy[s])
# Compare with other actions.
action_rewards = {}
for a in actions:
r = 0
for prob, next_state, reward in self.transitions_at(s, a):
r += prob * (reward + gamma * V[next_state])
action_rewards[a] = r
best_action = take_max_action(action_rewards)
if policy_action != best_action:
update_stable = False
# Update policy (set best_action prob=1, otherwise=0 (greedy))
for a in self.policy[s]:
prob = 1 if a == best_action else 0
self.policy[s][a] = prob
if update_stable:
# If policy isn't updated, stop iteration
break
# Turn dictionary to grid
V_grid = self.dict_to_grid(V)
return V_grid