-
-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathdif.py
319 lines (282 loc) · 16 KB
/
dif.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
import skimage.color
import matplotlib.pyplot as plt
import numpy as np
import cv2
import os
import time
import collections
class dif:
def __init__(self, directory_A, directory_B=None, similarity="normal", px_size=50, sort_output=False, show_output=False, show_progress=False, delete=False, silent_del=False):
"""
directory_A (str)......folder path to search for duplicate/similar images
directory_B (str)......second folder path to search for duplicate/similar images
similarity (str)......."normal" = searches for duplicates, recommended setting, MSE < 200
"high" = serached for exact duplicates, extremly sensitive to details, MSE < 0.1
"low" = searches for similar images, MSE < 1000
px_size (int)..........recommended not to change default value
resize images to px_size height x width (in pixels) before being compared
the higher the pixel size, the more computational ressources and time required
sort_output (bool).....False = adds the duplicate images to output dictionary in the order they were found
True = sorts the duplicate images in the output dictionars alphabetically
show_output (bool).....False = omits the output and doesn't show found images
True = shows duplicate/similar images found in output
show_progress (bool)...False = shows where your lengthy processing currently is
delete (bool)..........! please use with care, as this cannot be undone
lower resolution duplicate images that were found are automatically deleted
silent_del (bool)......! please use with care, as this cannot be undone
True = skips the asking for user confirmation when deleting lower resolution duplicate images
will only work if "delete" AND "silent_del" are both == True
OUTPUT (set)...........a dictionary with the filename of the duplicate images
and a set of lower resultion images of all duplicates
"""
start_time = time.time()
print("DifPy process initializing...", end="\r")
if directory_B != None:
# process both directories
dif._process_directory(directory_A)
dif._process_directory(directory_B)
else:
# process one directory
dif._process_directory(directory_A)
directory_B = directory_A
dif._validate_parameters(sort_output, show_output, show_progress, similarity, px_size, delete, silent_del)
if directory_B == directory_A:
result, lower_quality, total = dif._search_one_dir(directory_A,
similarity, px_size,
sort_output, show_output, show_progress)
else:
result, lower_quality, total = dif._search_two_dirs(directory_A, directory_B,
similarity, px_size,
sort_output, show_output, show_progress)
if sort_output == True:
result = collections.OrderedDict(sorted(result.items()))
end_time = time.time()
time_elapsed = np.round(end_time - start_time, 4)
stats = dif._generate_stats(directory_A, directory_B,
time.localtime(start_time), time.localtime(end_time), time_elapsed,
similarity, total, len(result))
self.result = result
self.lower_quality = lower_quality
self.stats = stats
if len(result) == 1:
images = "image"
else:
images = "images"
print("Found", len(result), images, "with one or more duplicate/similar images in", time_elapsed, "seconds.")
if len(result) != 0:
if delete:
if not silent_del:
usr = input("Are you sure you want to delete all lower resolution duplicate images? \nThis cannot be undone. (y/n)")
if str(usr) == "y":
dif._delete_imgs(set(lower_quality))
else:
print("Image deletion canceled.")
else:
dif._delete_imgs(set(lower_quality))
# Function that searches one directory for duplicate/similar images
def _search_one_dir(directory_A, similarity="normal", px_size=50, sort_output=False, show_output=False, show_progress=False):
img_matrices_A, filenames_A = dif._create_imgs_matrix(directory_A, px_size)
total = len(img_matrices_A)
result = {}
lower_quality = []
ref = dif._map_similarity(similarity)
# find duplicates/similar images within one folder
for count_A, imageMatrix_A in enumerate(img_matrices_A):
if show_progress:
dif._show_progress(count_A, img_matrices_A)
for count_B, imageMatrix_B in enumerate(img_matrices_A):
if count_B > count_A and count_A != len(img_matrices_A):
rotations = 0
while rotations <= 3:
if rotations != 0:
imageMatrix_B = dif._rotate_img(imageMatrix_B)
err = dif._mse(imageMatrix_A, imageMatrix_B)
if err < ref:
if show_output:
dif._show_img_figs(imageMatrix_A, imageMatrix_B, err)
dif._show_file_info(str("..." + directory_A[-35:]) + "/" + filenames_A[count_A],
str("..." + directory_A[-35:]) + "/" + filenames_A[count_B])
if filenames_A[count_A] in result.keys():
result[filenames_A[count_A]]["duplicates"] = result[filenames_A[count_A]]["duplicates"] + [directory_A + "/" + filenames_A[count_B]]
else:
result[filenames_A[count_A]] = {"location": directory_A + "/" + filenames_A[count_A],
"duplicates": [directory_A + "/" + filenames_A[count_B]]}
high, low = dif._check_img_quality(directory_A, directory_A, filenames_A[count_A], filenames_A[count_B])
lower_quality.append(low)
break
else:
rotations += 1
if sort_output == True:
result = collections.OrderedDict(sorted(result.items()))
return result, lower_quality, total
# Function that searches two directories for duplicate/similar images
def _search_two_dirs(directory_A, directory_B=None, similarity="normal", px_size=50, sort_output=False, show_output=False, show_progress=False):
img_matrices_A, filenames_A = dif._create_imgs_matrix(directory_A, px_size)
img_matrices_B, filenames_B = dif._create_imgs_matrix(directory_B, px_size)
total = len(img_matrices_A) + len(img_matrices_B)
result = {}
lower_quality = []
ref = dif._map_similarity(similarity)
# find duplicates/similar images between two folders
for count_A, imageMatrix_A in enumerate(img_matrices_A):
if show_progress:
dif._show_progress(count_A, img_matrices_A)
for count_B, imageMatrix_B in enumerate(img_matrices_B):
rotations = 0
while rotations <= 3:
if rotations != 0:
imageMatrix_B = dif._rotate_img(imageMatrix_B)
err = dif._mse(imageMatrix_A, imageMatrix_B)
if err < ref:
if show_output:
dif._show_img_figs(imageMatrix_A, imageMatrix_B, err)
dif._show_file_info(str("..." + directory_A[-35:]) + "/" + filenames_A[count_A],
str("..." + directory_B[-35:]) + "/" + filenames_B[count_B])
if filenames_A[count_A] in result.keys():
result[filenames_A[count_A]]["duplicates"] = result[filenames_A[count_A]]["duplicates"] + [directory_B + "/" + filenames_B[count_B]]
else:
result[filenames_A[count_A]] = {"location": directory_A + "/" + filenames_A[count_A],
"duplicates": [directory_B + "/" + filenames_B[count_B]]}
try:
high, low = dif._check_img_quality(directory_A, directory_B, filenames_A[count_A], filenames_B[count_B])
lower_quality.append(low)
except:
pass
break
else:
rotations += 1
if sort_output == True:
result = collections.OrderedDict(sorted(result.items()))
return result, lower_quality, total
# Function that processes the directories that were input as parameters
def _process_directory(directory):
# check if directories are valid
directory += os.sep
if not os.path.isdir(directory):
raise FileNotFoundError(f"Directory: " + directory + " does not exist")
return directory
# Function that validates the input parameters of DifPy
def _validate_parameters(sort_output, show_output, show_progress, similarity, px_size, delete, silent_del):
# validate the parameters of the function
if sort_output != True and sort_output != False:
raise ValueError('Invalid value for "sort_output" parameter.')
if show_output != True and show_output != False:
raise ValueError('Invalid value for "show_output" parameter.')
if show_progress != True and show_progress != False:
raise ValueError('Invalid value for "show_progress" parameter.')
if similarity not in ["low", "normal", "high"]:
raise ValueError('Invalid value for "similarity" parameter.')
if px_size < 10 or px_size > 5000:
raise ValueError('Invalid value for "px_size" parameter.')
if delete != True and delete != False:
raise ValueError('Invalid value for "delete" parameter.')
if silent_del != True and silent_del != False:
raise ValueError('Invalid value for "silent_del" parameter.')
# Function that creates a list of matrices for each image found in the folders
def _create_imgs_matrix(directory, px_size):
directory = dif._process_directory(directory)
img_filenames = []
# create list of all files in directory
folder_files = [filename for filename in os.listdir(directory)]
# create images matrix
imgs_matrix = []
for filename in folder_files:
path = os.path.join(directory, filename)
# check if the file is not a folder
if not os.path.isdir(path):
try:
img = cv2.imdecode(np.fromfile(
path, dtype=np.uint8), cv2.IMREAD_UNCHANGED)
if type(img) == np.ndarray:
img = img[..., 0:3]
img = cv2.resize(img, dsize=(
px_size, px_size), interpolation=cv2.INTER_CUBIC)
if len(img.shape) == 2:
img = skimage.color.gray2rgb(img)
imgs_matrix.append(img)
img_filenames.append(filename)
except:
pass
return imgs_matrix, img_filenames
# Function that maps the similarity grade to the respective MSE value
def _map_similarity(similarity):
if similarity == "low":
ref = 1000
# search for exact duplicate images, extremly sensitive, MSE < 0.1
elif similarity == "high":
ref = 0.1
# normal, search for duplicates, recommended, MSE < 200
else:
ref = 200
return ref
# Function that calulates the mean squared error (mse) between two image matrices
def _mse(imageA, imageB):
err = np.sum((imageA.astype("float") - imageB.astype("float")) ** 2)
err /= float(imageA.shape[0] * imageA.shape[1])
return err
# Function that plots two compared image files and their mse
def _show_img_figs(imageA, imageB, err):
fig = plt.figure()
plt.suptitle("MSE: %.2f" % (err))
# plot first image
ax = fig.add_subplot(1, 2, 1)
plt.imshow(imageA, cmap=plt.cm.gray)
plt.axis("off")
# plot second image
ax = fig.add_subplot(1, 2, 2)
plt.imshow(imageB, cmap=plt.cm.gray)
plt.axis("off")
# show the images
plt.show()
# Function for printing filename info of plotted image files
def _show_file_info(imageA, imageB):
print("""Duplicate files:\n{} and \n{}""".format(imageA, imageB))
# Function that displays a progress bar during the search
def _show_progress(count, img_matrix):
if count+1 == len(img_matrix):
print("DifPy processing images: [{}/{}] [{:.0%}]".format(count, len(img_matrix), count/len(img_matrix)), end="\r")
print("DifPy processing images: [{}/{}] [{:.0%}]".format(count+1, len(img_matrix), (count+1)/len(img_matrix)))
else:
print("DifPy processing images: [{}/{}] [{:.0%}]".format(count, len(img_matrix), count/len(img_matrix)), end="\r")
# Function for rotating an image matrix by a 90 degree angle
def _rotate_img(image):
image = np.rot90(image, k=1, axes=(0, 1))
return image
# Function for checking the quality of compared images, appends the lower quality image to the list
def _check_img_quality(directoryA, directoryB, imageA, imageB):
dirA = dif._process_directory(directoryA)
dirB = dif._process_directory(directoryB)
size_imgA = os.stat(os.path.join(dirA, imageA)).st_size
size_imgB = os.stat(os.path.join(dirB, imageB)).st_size
if size_imgA >= size_imgB:
return directoryA + "/" + imageA, directoryB + "/" + imageB
else:
return directoryB + "/" + imageB, directoryA + "/" + imageA
# Function that generates a dictionary for statistics around the completed DifPy process
def _generate_stats(directoryA, directoryB, start_time, end_time, time_elapsed, similarity, total_searched, total_found):
stats = {}
stats["directory_1"] = directoryA
if directoryB != None:
stats["directory_2"] = directoryB
stats["duration"] = {"start_date": time.strftime("%Y-%m-%d", start_time),
"start_time": time.strftime("%H:%M:%S", start_time),
"end_date": time.strftime("%Y-%m-%d", end_time),
"end_time": time.strftime("%H:%M:%S", end_time),
"seconds_elapsed": time_elapsed}
stats["similarity_grade"] = similarity
stats["similarity_mse"] = dif._map_similarity(similarity)
stats["total_images_searched"] = total_searched
stats["total_images_found"] = total_found
return stats
# Function for deleting the lower quality images that were found after the search
def _delete_imgs(lower_quality_set):
deleted = 0
for file in lower_quality_set:
print("\nDeletion in progress...", end="\r")
try:
os.remove(file)
print("Deleted file:", file, end="\r")
deleted += 1
except:
print("Could not delete file:", file, end="\r")
print("\n***\nDeleted", deleted, "images.")