-
Notifications
You must be signed in to change notification settings - Fork 0
/
train.py
400 lines (313 loc) · 15.9 KB
/
train.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
# -*- coding: utf-8 -*-
"""Face_Recognition.ipynb
Automatically generated by Colaboratory.
Original file is located at
https://colab.research.google.com/drive/1uNMFPmSaljIPqtxJ0CjfbhWFd67dmeIz
"""
import os; import time
#os.environ['AUTOGRAPH_VERBOSITY'] = '1'
from os import listdir
#from os.path import isdir
import gc
from PIL import Image
import numpy as np
import matplotlib.pyplot as plt
import pickle
import tensorflow as tf
import tensorflow.keras.backend as K
from tensorflow.keras.models import Model, load_model
#from keras.engine.topology import Layer
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.regularizers import l2
from keras.utils.vis_utils import plot_model
from tensorflow.keras.layers import Input, Concatenate, Lambda, Flatten, Dense
from helper_func import image_resize_cum_preprocess, split_text
# similar code as above...
#from tensorflow.keras.backend import set_session
#config = tf.compat.v1.ConfigProto()
#config.gpu_options.allow_growth = True # For full use of GPU
#config.gpu_options.per_process_gpu_memory_fraction = 0.9# use 90% of gpu
#config.log_device_placement = True
#sess = tf.compat.v1.Session(config = config)#compat.v1.Session
#set_session(sess)
print(f"Tensor Flow Version: {tf.__version__}")
print(f"Keras Version: {tf.keras.__version__}")
print()
#print(f"Pandas {pd.__version__}")
gpu = len(tf.config.list_physical_devices('GPU'))>0
print("GPU is", "available" if gpu else "NOT AVAILABLE")
print(100*"#")
# declare path
image_path = "cropped_images"
# splitter - Train and Test - 80% and 20%
def splitter(img_path):
global_lst = []; n = 0
for subdir in listdir(img_path):
file_names = []
for file_name in listdir(img_path + "/" + subdir):
file_names.append(file_name); n += 1
global_lst.append(file_names)
train_list, test_list = [], []
for j in range(len(global_lst)):
# here we specify no. of examples in Train set.
no_examples_train_class = len(global_lst[j]) - len(global_lst[j])//5
train_list.extend([global_lst[j][:no_examples_train_class]])
test_list.extend([global_lst[j][no_examples_train_class:]])
# return the lists
return train_list, test_list
# this function associate a label with the image.
def labeller(global_path_for_images, train_img_names, size = (160, 160)):
face_arr_global = []; label_arr = []
for subdir, class_idx in zip(listdir(global_path_for_images), train_img_names):
face_arr = None
for file_name in class_idx:
if face_arr is None:
face_arr = image_resize_cum_preprocess(global_path_for_images + "/" + subdir + "/" + file_name, size)
else:
face_arr = np.vstack((face_arr, image_resize_cum_preprocess(global_path_for_images + "/" + subdir + "/" + file_name, size)))
label_arr.append(subdir)
face_arr_global.append(face_arr)
return {"face_array":face_arr_global, "labels":label_arr}
####################################### Here we save the Train and Test Image Names and Also preprocessed Images along with Labels#################
# split the labels into training and test.
train_names, test_names = splitter(image_path)
# save the list of
with open("train_test.txt", "wb") as fp:
pickle.dump({"train_name":train_names, "test_name":test_names}, fp)
# let's save the array type labelled images.
dict_face = labeller(image_path, train_names)
#save this dictionary with pickle
with open("labelled_pkl", "wb") as fp:
pickle.dump(dict_face, fp)
######################################################## FaceNEt Model Construction#########################################################
"""## Let's construct FaceNet model for Recognition."""
with open("labelled_pkl", "rb") as fp:
data = pickle.load(fp)
tot_data = 0
for idx, class_name in enumerate(listdir(image_path)):
print(f"No. of examples in '{class_name}' class = {data['face_array'][idx].shape[0]}")
tot_data += data['face_array'][idx].shape[0]
#print("sdcsd cd {0:1.3f} and asdhcasd {1:0.2f}".format(4.25, 5.3))
print(f"Total data = {tot_data}")
print(100*"#")
# load FaceNet Keras pre-trained model by Hiroki Taniai trained on MS-Celeb-1M dataset
model = load_model("model/facenet_keras.h5", compile=True)
model.load_weights("weights/facenet_keras_weights.h5")
print("Shape of FaceNet model inputs: ", model.inputs)
print("Shape of FaceNet model outputs: ", model.outputs)
print(100*"#")
print("Fine-Tuning: Name of all those Top layers allowed for fine-tuning")
for i, layer in enumerate(model.layers[-19:-1]):
print(i +". " +layer.name)
print(100*"#")
def lossless_triplet_loss(y_pred, y_true = None, N = 128, beta=None, epsilon=1e-8):
"""
Implementation of the triplet loss function
Arguments:
y_true -- true labels, required when you define a loss in Keras, you don't need it in this function.
y_pred -- python list containing three objects:
anchor -- the encodings for the anchor data
positive -- the encodings for the positive data (similar to anchor)
negative -- the encodings for the negative data (different from anchor)
N -- The number of dimension
beta -- The scaling factor, N is recommended
epsilon -- The Epsilon value to prevent ln(0)
Returns:
loss -- real number, value of the loss
"""
if beta is None:
beta = N # recommended
anchor = tf.convert_to_tensor(y_pred[:,0:N])
positive = tf.convert_to_tensor(y_pred[:,N:N*2])
negative = tf.convert_to_tensor(y_pred[:,N*2:N*3])
# distance between the anchor and the positive
pos_dist = tf.reduce_sum(tf.square(tf.subtract(anchor,positive)),1)
# distance between the anchor and the negative
neg_dist = tf.reduce_sum(tf.square(tf.subtract(anchor,negative)),1)
#Non Linear Values
# -ln(-x/N+1)
pos_dist = -tf.log(-tf.divide((pos_dist),beta)+1+epsilon)
neg_dist = -tf.log(-tf.divide((N-neg_dist),beta)+1+epsilon)
# compute loss
loss = neg_dist + pos_dist
return loss
def triplet_loss(y_pred, y_true = None, alpha = 0.9):
"""
Implementation of the triplet loss function
Arguments:
y_true -- true labels, required when you define a loss in Keras, you don't need it in this function.
y_pred -- python list containing three objects:
anchor -- the encodings for the anchor data
positive -- the encodings for the positive data (similar to anchor)
negative -- the encodings for the negative data (different from anchor)
Returns:
loss -- real number, value of the loss
"""
if y_true is None: del y_true; gc.collect()
anchor = y_pred[:,0:128]
positive = y_pred[:,128:256]
negative = y_pred[:,256:384]
# distance between the anchor and the positive
pos_dist = K.sum(K.square(anchor-positive), axis=-1)
# distance between the anchor and the negative
neg_dist = K.sum(K.square(anchor-negative), axis=-1)
# compute loss
basic_loss = pos_dist-neg_dist+alpha
loss = K.sum(K.maximum(basic_loss,0.0), axis=0)
return loss
"""class TripletLossLayer(Layer):
def __init__(self, alpha, **kwargs):
self.alpha = alpha
super(TripletLossLayer, self).__init__(**kwargs)
def triplet_loss(self, inputs):
anchor, positive, negative = inputs
p_dist = K.sum(K.square(anchor-positive), axis=-1)
n_dist = K.sum(K.square(anchor-negative), axis=-1)
return K.sum(K.maximum(p_dist - n_dist + self.alpha, 0), axis=0)
def call(self, inputs):
loss = self.triplet_loss(inputs)
self.add_loss(loss)
return loss
"""
def triplet_loss_model(pre_trained_model, in_dims = (160, 160, 3), margin = 0.9):
# Create the 3 inputs
anchor_in = Input(shape=in_dims)
pos_in = Input(shape=in_dims)
neg_in = Input(shape=in_dims)
# Obtain the embeddings of three images
# keeping trainig = False we are ensuring that we are keeping our base_model in inference mode i.e. The base model contains batchnorm layers.
# when we unfreeze the base model for fine-tuning, so we make sure that the base_model is running in inference mode here.
anchor_embed = pre_trained_model(anchor_in)# correct statistics learnt by BN mustn't be changed
pos_embed = pre_trained_model(pos_in)
neg_embed = pre_trained_model(neg_in)
#pre_trained_model.trainable = False
# Normalize the embeddings
anchor_embed = Lambda(lambda x: tf.math.l2_normalize(x, axis=1))(anchor_embed) # L2 normalize embedding
pos_embed = Lambda(lambda x: tf.math.l2_normalize(x, axis=1))(pos_embed)
neg_embed = Lambda(lambda x: tf.math.l2_normalize(x, axis=1))(neg_embed)
#freeze whole model allowing some top layers for fine tuning.
for layer in pre_trained_model.layers[:-19]:
layer.trainable = False
pre_trained_model.layers[-1].trainable = False# preventing Batch Normalization layer from updating
# merge the embeddings
concatenated_embeddings = Concatenate()([anchor_embed, pos_embed, neg_embed])
#TripletLoss Layer#loss_layer = TripletLossLayer(alpha=margin,name='triplet_loss_layer')([anchor_embed, pos_embed, neg_embed])
# define the trainable model
final_model = Model(inputs = [anchor_in, pos_in, neg_in], outputs = concatenated_embeddings)
return final_model
model_triplet = triplet_loss_model(model)
# construct triplet dataset.
def get_batch_random(X, batch_size):
"""
Create batch of APN triplets with a complete random strategy
Arguments:
batch_size -- integer
X -- the dataset from which triplets are to be constructed.
Returns:
triplets -- list containing 3 tensors A,P,N of shape (batch_size,w,h,c)
"""
m, w, h, c = X[0].shape
nb_classes = 5
# initialize result
triplets=[np.zeros((batch_size, h, w, c)) for i in range(3)]
for i in range(batch_size):
#Pick one random class for anchor
anchor_class = np.random.randint(0, nb_classes)
nb_sample_available_for_class_AP = X[anchor_class].shape[0]
#Pick two different random pics for this class => A and P
[idx_A, idx_P] = np.random.choice(nb_sample_available_for_class_AP,size=2,replace=False)
#Pick another class for N, different from anchor_class
negative_class = (anchor_class + np.random.randint(1,nb_classes)) % nb_classes
nb_sample_available_for_class_N = X[negative_class].shape[0]
#Pick a random pic for this negative class => N
idx_N = np.random.randint(0, nb_sample_available_for_class_N)
triplets[0][i,:,:,:] = X[anchor_class][idx_A,:,:,:]
triplets[1][i,:,:,:] = X[anchor_class][idx_P,:,:,:]
triplets[2][i,:,:,:] = X[negative_class][idx_N,:,:,:]
return triplets
def get_batch_hard(X, draw_batch_size, hard_batchs_size, norm_batchs_size, network):
"""
Create batch of APN "hard" triplets
Arguments:
X -- dataset
draw_batch_size -- integer : number of initial randomly taken samples
hard_batchs_size -- interger : select the number of hardest samples to keep
norm_batchs_size -- interger : number of random samples to add
Note: Try to make hard_batchs_size + norm_batchs_size = draw_batch_size
Returns:
triplets -- list containing 3 tensors A,P,N of shape (hard_batchs_size+norm_batchs_size,w,h,c)
"""
m, w, h,c = X[0].shape
#Step 1 : pick a random batch to study
studybatch = get_batch_random(X, draw_batch_size)
#Step 2 : compute the loss with current network : d(A,P)-d(A,N). The alpha parameter here is omited here since we want only to order them
studybatchloss = np.zeros((draw_batch_size))
#Compute embeddings for anchors, positive and negatives
predictions = network.predict([studybatch[0], studybatch[1], studybatch[2]])
A, P, N = predictions[:, :128], predictions[:, 128:256], predictions[:, 256:384]
#Compute d(A,P)-d(A,N)
studybatchloss = np.sum(np.square(A-P),axis=1) - np.sum(np.square(A-N),axis=1)
#Sort by distance (high distance first) and take the
selection = np.argsort(studybatchloss)[::-1][:hard_batchs_size]
#Draw other random samples from the batch
selection2 = np.random.choice(np.delete(np.arange(draw_batch_size),selection),norm_batchs_size,replace=False)
selection = np.append(selection,selection2)
triplets = [studybatch[0][selection,:,:,:], studybatch[1][selection,:,:,:], studybatch[2][selection,:,:,:]]
return triplets
############################################### GEnerate Triplets ##############################################################
#triplets_data = get_batch_hard(data['face_array'], 300, 200, 100, triplet_loss_model(model))
amt_of_data = int(input("Enter amount of Triplets you want to generate (Make it more than 1500): "))
triplets_data = get_batch_random(data['face_array'], amt_of_data)
############################################### Start Training ################################################################
# Instantiate an optimizer.
optimizer = Adam(learning_rate=1e-5)
# Prepare the training dataset.
batch_size = int(input("Please mention a batch-size for Training: "))
train_dataset = tf.data.Dataset.from_tensor_slices({"input_1": triplets_data[0], "input_2": triplets_data[1], "input_3": triplets_data[2]})
train_dataset = train_dataset.shuffle(buffer_size=1024).batch(batch_size)
epochs = int(input("Enter no. of Epochs: "))
for epoch in range(epochs):
print('Start of epoch %d' % epoch)
print(50*"=")
# Iterate over the batches of the dataset.
for step, x_train_batch in enumerate(train_dataset):
# Open a GradientTape to record the operations run
# during the forward pass, which enables autodifferentiation.
with tf.GradientTape() as tape:
# Run the forward pass of the layer.
# The operations that the layer applies to its inputs are going to be recorded on the GradientTape.
# Logits for this minibatch
logits = model_triplet([x_train_batch["input_1"], x_train_batch["input_2"], x_train_batch["input_3"]], training=True)
# Compute the loss value for this minibatch.
loss_value = triplet_loss(y_pred=logits)
# Use the gradient tape to automatically retrieve the gradients of the trainable variables with respect to the loss.
grads = tape.gradient(loss_value, model_triplet.trainable_weights)
# Run one step of gradient descent by updating the value of the variables to minimize the loss.
optimizer.apply_gradients(zip(grads, model_triplet.trainable_weights))
# Log every 5 batches.
if step % 5 == 0:
print('Training loss (for one batch) at step %s: %s' % (step, float(loss_value)))
print('Seen so far: %s samples' % ((step + 1) * batch_size))
print("\n\n")
# Save the trained model.
model_triplet.save("FaceNet.h5", save_format='h5')
################################################# Embeddings of Anchor Image #################################################
# Function to create Anchor Embeddings
def create_anchor_embedding(label_file, image_folder_path, trained_model):
# Create an Embedding file.
embed_lst = {}
for image_folder in label_file['test_name']:
fold_name = split_text(image_folder[0])
img_path = image_folder_path + '/' + fold_name + '/' + image_folder[0]
prep_img = image_resize_cum_preprocess(img_path, size = (160, 160))
pred = trained_model.predict([prep_img, prep_img, prep_img])[:128]
embed_lst[fold_name] = pred
return embed_lst
# Import train and Test labels.
with open("train_test.txt", "rb") as fp:
train_test_label = pickle.load(fp)
# Generate Anchor embeddings which represents the correpsonding person's face and against whom we will match our test image embedding.
embeddings = create_anchor_embedding(train_test_label, "cropped_images", model_triplet)
# Dump these embeddings into a .txt file.
with open("embeddings.txt", "wb") as fpp:
pickle.dump(embeddings, fpp)