-
Notifications
You must be signed in to change notification settings - Fork 70
/
dqn_agent.py
172 lines (131 loc) · 6.22 KB
/
dqn_agent.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
from keras.models import Sequential, load_model
from keras.layers import Dense
from collections import deque
import numpy as np
import random
# Deep Q Learning Agent + Maximin
#
# This version only provides one value per input,
# that indicates the score expected in that state.
# This is because the algorithm will try to find the
# best final state for the combinations of possible states,
# in contrast to the traditional way of finding the best
# action for a particular state.
class DQNAgent:
'''Deep Q Learning Agent + Maximin
Args:
state_size (int): Size of the input domain
mem_size (int): Size of the replay buffer
discount (float): How important are the future rewards compared to the immediate ones [0,1]
epsilon (float): Exploration (probability of random values given) value at the start
epsilon_min (float): At what epsilon value the agent stops decrementing it
epsilon_stop_episode (int): At what episode the agent stops decreasing the exploration variable
n_neurons (list(int)): List with the number of neurons in each inner layer
activations (list): List with the activations used in each inner layer, as well as the output
loss (obj): Loss function
optimizer (obj): Optimizer used
replay_start_size: Minimum size needed to train
modelFile: Previously trained model file path to load (arguments such as activations will be ignored)
'''
def __init__(self, state_size, mem_size=10000, discount=0.95,
epsilon=1, epsilon_min=0, epsilon_stop_episode=0,
n_neurons=[32, 32], activations=['relu', 'relu', 'linear'],
loss='mse', optimizer='adam', replay_start_size=None, modelFile=None):
if len(activations) != len(n_neurons) + 1:
raise ValueError("n_neurons and activations do not match, "
f"expected a n_neurons list of length {len(activations) - 1}")
if replay_start_size is not None and replay_start_size > mem_size:
raise ValueError("replay_start_size must be <= mem_size")
if mem_size <= 0:
raise ValueError("mem_size must be > 0")
self.state_size = state_size
self.mem_size = mem_size
self.memory = deque(maxlen=mem_size)
self.discount = discount
if epsilon_stop_episode > 0:
self.epsilon = epsilon
self.epsilon_min = epsilon_min
self.epsilon_decay = (self.epsilon - self.epsilon_min) / (epsilon_stop_episode)
else: # no random exploration
self.epsilon = 0
self.n_neurons = n_neurons
self.activations = activations
self.loss = loss
self.optimizer = optimizer
if not replay_start_size:
replay_start_size = mem_size / 2
self.replay_start_size = replay_start_size
# load an existing model
if modelFile is not None:
self.model = load_model(modelFile)
# create a new model
else:
self.model = self._build_model()
def _build_model(self):
'''Builds a Keras deep neural network model'''
model = Sequential()
model.add(Dense(self.n_neurons[0], input_dim=self.state_size, activation=self.activations[0]))
for i in range(1, len(self.n_neurons)):
model.add(Dense(self.n_neurons[i], activation=self.activations[i]))
model.add(Dense(1, activation=self.activations[-1]))
model.compile(loss=self.loss, optimizer=self.optimizer)
return model
def add_to_memory(self, current_state, next_state, reward, done):
'''Adds a play to the replay memory buffer'''
self.memory.append((current_state, next_state, reward, done))
def random_value(self):
'''Random score for a certain action'''
return random.random()
def predict_value(self, state):
'''Predicts the score for a certain state'''
return self.model.predict(state, verbose=0)[0]
def act(self, state):
'''Returns the expected score of a certain state'''
state = np.reshape(state, [1, self.state_size])
if random.random() <= self.epsilon:
return self.random_value()
else:
return self.predict_value(state)
def best_state(self, states):
'''Returns the best state for a given collection of states'''
max_value = None
best_state = None
if random.random() <= self.epsilon:
return random.choice(list(states))
else:
for state in states:
value = self.predict_value(np.reshape(state, [1, self.state_size]))
if not max_value or value > max_value:
max_value = value
best_state = state
return best_state
def train(self, batch_size=32, epochs=3):
'''Trains the agent'''
if batch_size > self.mem_size:
print('WARNING: batch size is bigger than mem_size. The agent will not be trained.')
n = len(self.memory)
if n >= self.replay_start_size and n >= batch_size:
batch = random.sample(self.memory, batch_size)
# Get the expected score for the next states, in batch (better performance)
next_states = np.array([x[1] for x in batch])
next_qs = [x[0] for x in self.model.predict(next_states)]
x = []
y = []
# Build xy structure to fit the model in batch (better performance)
for i, (state, _, reward, done) in enumerate(batch):
if not done:
# Partial Q formula
new_q = reward + self.discount * next_qs[i]
else:
new_q = reward
x.append(state)
y.append(new_q)
# Fit the model to the given values
self.model.fit(np.array(x), np.array(y), batch_size=batch_size, epochs=epochs, verbose=0)
# Update the exploration variable
if self.epsilon > self.epsilon_min:
self.epsilon -= self.epsilon_decay
def save_model(self, name):
'''Saves the current model.
It is recommended to name the file with the ".keras" extension.'''
self.model.save(name)