-
Notifications
You must be signed in to change notification settings - Fork 0
/
rnn_theano.py
115 lines (104 loc) · 5.26 KB
/
rnn_theano.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
import numpy as np
import theano as theano
import theano.tensor as T
from utils import *
import operator
class RNNTheano:
def __init__(self, word_dim, hidden_dim=100, bptt_truncate=4):
# Assign instance variables
self.word_dim = word_dim
self.hidden_dim = hidden_dim
self.bptt_truncate = bptt_truncate
# Randomly initialize the network parameters
U = np.random.uniform(-np.sqrt(1./word_dim), np.sqrt(1./word_dim), (hidden_dim, word_dim))
V = np.random.uniform(-np.sqrt(1./hidden_dim), np.sqrt(1./hidden_dim), (word_dim, hidden_dim))
W = np.random.uniform(-np.sqrt(1./hidden_dim), np.sqrt(1./hidden_dim), (hidden_dim, hidden_dim))
# Theano: Created shared variables
self.U = theano.shared(name='U', value=U.astype(theano.config.floatX))
self.V = theano.shared(name='V', value=V.astype(theano.config.floatX))
self.W = theano.shared(name='W', value=W.astype(theano.config.floatX))
# We store the Theano graph here
self.theano = {}
self.__theano_build__()
def __theano_build__(self):
U, V, W = self.U, self.V, self.W
x = T.ivector('x')
y = T.ivector('y')
def forward_prop_step(x_t, s_t_prev, U, V, W):
s_t = T.tanh(U[:,x_t] + W.dot(s_t_prev))
o_t = T.nnet.softmax(V.dot(s_t))
return [o_t[0], s_t]
[o,s], updates = theano.scan(
forward_prop_step,
sequences=x,
outputs_info=[None, dict(initial=T.zeros(self.hidden_dim))],
non_sequences=[U, V, W],
truncate_gradient=self.bptt_truncate,
strict=True)
prediction = T.argmax(o, axis=1)
o_error = T.sum(T.nnet.categorical_crossentropy(o, y))
# Gradients
dU = T.grad(o_error, U)
dV = T.grad(o_error, V)
dW = T.grad(o_error, W)
# Assign functions
self.forward_propagation = theano.function([x], o)
self.predict = theano.function([x], prediction)
self.ce_error = theano.function([x, y], o_error)
self.bptt = theano.function([x, y], [dU, dV, dW])
# SGD
learning_rate = T.scalar('learning_rate')
self.sgd_step = theano.function([x,y,learning_rate], [],
updates=[(self.U, self.U - learning_rate * dU),
(self.V, self.V - learning_rate * dV),
(self.W, self.W - learning_rate * dW)])
def calculate_total_loss(self, X, Y):
return np.sum([self.ce_error(x,y) for x,y in zip(X,Y)])
def calculate_loss(self, X, Y):
# Divide calculate_loss by the number of words
num_words = np.sum([len(y) for y in Y])
return self.calculate_total_loss(X,Y)/float(num_words)
def gradient_check_theano(model, x, y, h=0.001, error_threshold=0.01):
# Overwrite the bptt attribute. We need to backpropagate all the way to get the correct gradient
model.bptt_truncate = 1000
# Calculate the gradients using backprop
bptt_gradients = model.bptt(x, y)
# List of all parameters we want to chec.
model_parameters = ['U', 'V', 'W']
# Gradient check for each parameter
for pidx, pname in enumerate(model_parameters):
# Get the actual parameter value from the mode, e.g. model.W
parameter_T = operator.attrgetter(pname)(model)
parameter = parameter_T.get_value()
print "Performing gradient check for parameter %s with size %d." % (pname, np.prod(parameter.shape))
# Iterate over each element of the parameter matrix, e.g. (0,0), (0,1), ...
it = np.nditer(parameter, flags=['multi_index'], op_flags=['readwrite'])
while not it.finished:
ix = it.multi_index
# Save the original value so we can reset it later
original_value = parameter[ix]
# Estimate the gradient using (f(x+h) - f(x-h))/(2*h)
parameter[ix] = original_value + h
parameter_T.set_value(parameter)
gradplus = model.calculate_total_loss([x],[y])
parameter[ix] = original_value - h
parameter_T.set_value(parameter)
gradminus = model.calculate_total_loss([x],[y])
estimated_gradient = (gradplus - gradminus)/(2*h)
parameter[ix] = original_value
parameter_T.set_value(parameter)
# The gradient for this parameter calculated using backpropagation
backprop_gradient = bptt_gradients[pidx][ix]
# calculate The relative error: (|x - y|/(|x| + |y|))
relative_error = np.abs(backprop_gradient - estimated_gradient)/(np.abs(backprop_gradient) + np.abs(estimated_gradient))
# If the error is to large fail the gradient check
if relative_error > error_threshold:
print "Gradient Check ERROR: parameter=%s ix=%s" % (pname, ix)
print "+h Loss: %f" % gradplus
print "-h Loss: %f" % gradminus
print "Estimated_gradient: %f" % estimated_gradient
print "Backpropagation gradient: %f" % backprop_gradient
print "Relative Error: %f" % relative_error
return
it.iternext()
print "Gradient check for parameter %s passed." % (pname)